You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/31 23:22:34 UTC

cassandra git commit: Restore performance of writeUTF; follow up commit to CASSANDRA-8670

Repository: cassandra
Updated Branches:
  refs/heads/trunk 230cca580 -> 0352a15a3


Restore performance of writeUTF; follow up commit to CASSANDRA-8670

patch by ariel and benedict for CASSANDRA-8670


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0352a15a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0352a15a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0352a15a

Branch: refs/heads/trunk
Commit: 0352a15a318e8121f8ec977d28379961a9aec387
Parents: 230cca5
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Tue Mar 31 22:21:43 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Mar 31 22:21:43 2015 +0100

----------------------------------------------------------------------
 .../cassandra/io/util/DataOutputStreamPlus.java |   2 +-
 .../io/util/UnbufferedDataOutputStreamPlus.java |  99 +++----
 .../test/microbench/OutputStreamBench.java      | 274 +++++++++++++++++++
 .../io/util/BufferedDataOutputStreamTest.java   |  26 +-
 4 files changed, 342 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0352a15a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
index 6de2879..a846384 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
@@ -55,7 +55,7 @@ public abstract class DataOutputStreamPlus extends OutputStream implements DataO
     protected static byte[] retrieveTemporaryBuffer(int minSize)
     {
         byte[] bytes = tempBuffer.get();
-        if (bytes.length < minSize)
+        if (bytes.length < Math.min(minSize, MAX_BUFFER_SIZE))
         {
             // increase in powers of 2, to avoid wasted repeat allocations
             bytes = new byte[Math.min(MAX_BUFFER_SIZE, 2 * Integer.highestOneBit(minSize))];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0352a15a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
index 31abfa8..ac3bae5 100644
--- a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
@@ -252,81 +252,72 @@ public abstract class UnbufferedDataOutputStreamPlus extends DataOutputStreamPlu
     public static void writeUTF(String str, DataOutput out) throws IOException
     {
         int length = str.length();
-        int utfCount = calculateUTFLength(str, length);
+        int utfCount = 0;
+        for (int i = 0 ; i < length ; i++)
+        {
+            int ch = str.charAt(i);
+            if ((ch > 0) & (ch <= 127))
+                utfCount += 1;
+            else if (ch <= 2047)
+                utfCount += 2;
+            else
+                utfCount += 3;
+        }
 
         if (utfCount > 65535)
             throw new UTFDataFormatException(); //$NON-NLS-1$
 
         byte[] utfBytes = retrieveTemporaryBuffer(utfCount + 2);
 
-        int utfIndex = 2;
-        utfBytes[0] = (byte) (utfCount >> 8);
-        utfBytes[1] = (byte) utfCount;
         int bufferLength = utfBytes.length;
-
-        if (utfCount == length && utfCount + utfIndex < bufferLength)
+        if (utfCount == length)
         {
-            for (int charIndex = 0 ; charIndex < length ; charIndex++)
-                utfBytes[utfIndex++] = (byte) str.charAt(charIndex);
+            utfBytes[0] = (byte) (utfCount >> 8);
+            utfBytes[1] = (byte) utfCount;
+            int firstIndex = 2;
+            for (int offset = 0 ; offset < length ; offset += bufferLength)
+            {
+                int runLength = Math.min(bufferLength - firstIndex, length - offset) + firstIndex;
+                offset -= firstIndex;
+                for (int i = firstIndex ; i < runLength; i++)
+                    utfBytes[i] = (byte) str.charAt(offset + i);
+                out.write(utfBytes, 0, runLength);
+                offset += firstIndex;
+                firstIndex = 0;
+            }
         }
         else
         {
-            int charIndex = 0;
-            while (charIndex < length)
+            int utfIndex = 2;
+            utfBytes[0] = (byte) (utfCount >> 8);
+            utfBytes[1] = (byte) utfCount;
+            for (int charIndex = 0 ; charIndex < length ; charIndex++)
             {
-                char ch = str.charAt(charIndex);
-                int sizeOfChar = sizeOfChar(ch);
-                if (utfIndex + sizeOfChar > bufferLength)
+                if (utfIndex + 3 > bufferLength)
                 {
                     out.write(utfBytes, 0, utfIndex);
                     utfIndex = 0;
                 }
 
-                switch (sizeOfChar)
+                char ch = str.charAt(charIndex);
+                if ((ch > 0) & (ch <= 127))
+                {
+                    utfBytes[utfIndex++] = (byte) ch;
+                }
+                else if (ch <= 2047)
                 {
-                    case 3:
-                        utfBytes[utfIndex] = (byte) (0xe0 | (0x0f & (ch >> 12)));
-                        utfBytes[utfIndex + 1] = (byte) (0x80 | (0x3f & (ch >> 6)));
-                        utfBytes[utfIndex + 2] = (byte) (0x80 | (0x3f & ch));
-                        break;
-                    case 2:
-                        utfBytes[utfIndex] = (byte) (0xc0 | (0x1f & (ch >> 6)));
-                        utfBytes[utfIndex + 1] = (byte) (0x80 | (0x3f & ch));
-                        break;
-                    case 1:
-                        utfBytes[utfIndex] = (byte) ch;
-                        break;
-                    default:
-                        throw new IllegalStateException();
+                    utfBytes[utfIndex++] = (byte) (0xc0 | (0x1f & (ch >> 6)));
+                    utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & ch));
+                }
+                else
+                {
+                    utfBytes[utfIndex++] = (byte) (0xe0 | (0x0f & (ch >> 12)));
+                    utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & (ch >> 6)));
+                    utfBytes[utfIndex++] = (byte) (0x80 | (0x3f & ch));
                 }
-                utfIndex += sizeOfChar;
-                charIndex++;
             }
+            out.write(utfBytes, 0, utfIndex);
         }
-        out.write(utfBytes, 0, utfIndex);
-    }
-
-    /*
-     * Factored out into separate method to create more flexibility around inlining
-     */
-    private static int calculateUTFLength(String str, int length)
-    {
-        int utfCount = 0;
-        for (int i = 0; i < length; i++)
-            utfCount += sizeOfChar(str.charAt(i));
-        return utfCount;
-    }
-
-    private static int sizeOfChar(int ch)
-    {
-        // wrap 0 around to max, because it requires 3 bytes
-        return 1
-               // if >= 128, we need an extra byte, so we divide by 128 and check the value is > 0
-               // (by negating it and taking the sign bit)
-               + (-(ch / 128) >>> 31)
-               // if >= 2048, or == 0, we need another extra byte; we subtract one and wrap around,
-               // so we only then need to confirm it is greater than 2047
-               + (-(((ch - 1) & 0xffff) / 2047) >>> 31);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0352a15a/test/microbench/org/apache/cassandra/test/microbench/OutputStreamBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/OutputStreamBench.java b/test/microbench/org/apache/cassandra/test/microbench/OutputStreamBench.java
new file mode 100644
index 0000000..b8136f7
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/OutputStreamBench.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamTest;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+import org.openjdk.jmh.annotations.*;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.TimeUnit;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 3,jvmArgsAppend = "-Xmx512M")
+@Threads(1)
+@State(Scope.Benchmark)
+public class OutputStreamBench
+{
+
+    BufferedOutputStream hole = new BufferedOutputStream(new OutputStream() {
+
+        @Override
+        public void write(int b) throws IOException
+        {
+
+        }
+
+        @Override
+        public void write(byte b[]) throws IOException {
+
+        }
+
+        @Override
+        public void write(byte b[], int a, int c) throws IOException {
+
+        }
+        });
+
+    WrappedDataOutputStreamPlus streamA = new WrappedDataOutputStreamPlus(hole);
+
+    BufferedDataOutputStreamPlus streamB = new BufferedDataOutputStreamPlus(new WritableByteChannel() {
+
+        @Override
+        public boolean isOpen()
+        {
+            // TODO Auto-generated method stub
+            return true;
+        }
+
+        @Override
+        public void close() throws IOException
+        {
+            // TODO Auto-generated method stub
+
+        }
+
+        @Override
+        public int write(ByteBuffer src) throws IOException
+        {
+            int remaining = src.remaining();
+            src.position(src.limit());
+            return remaining;
+        }
+
+    }, 8192);
+
+    public static byte foo;
+
+    public static int foo1;
+
+    public static long foo2;
+
+    public static double foo3;
+
+    public static float foo4;
+
+    public static short foo5;
+
+    public static char foo6;
+
+    @Benchmark
+    public void testBOSByte() throws IOException
+    {
+        streamA.write(foo);
+    }
+
+    @Benchmark
+    public void testBDOSPByte() throws IOException
+    {
+        streamB.write(foo);
+    }
+
+    @Benchmark
+    public void testBOSInt() throws IOException
+    {
+        streamA.writeInt(foo1);
+    }
+
+    @Benchmark
+    public void testBDOSPInt() throws IOException
+    {
+        streamB.writeInt(foo1);
+    }
+
+    @Benchmark
+    public void testBOSLong() throws IOException
+    {
+        streamA.writeLong(foo2);
+    }
+
+    @Benchmark
+    public void testBDOSPLong() throws IOException
+    {
+        streamB.writeLong(foo2);
+    }
+
+    @Benchmark
+    public void testBOSMixed() throws IOException
+    {
+        streamA.write(foo);
+        streamA.writeInt(foo1);
+        streamA.writeLong(foo2);
+        streamA.writeDouble(foo3);
+        streamA.writeFloat(foo4);
+        streamA.writeShort(foo5);
+        streamA.writeChar(foo6);
+    }
+
+    @Benchmark
+    public void testBDOSPMixed() throws IOException
+    {
+        streamB.write(foo);
+        streamB.writeInt(foo1);
+        streamB.writeLong(foo2);
+        streamB.writeDouble(foo3);
+        streamB.writeFloat(foo4);
+        streamB.writeShort(foo5);
+        streamB.writeChar(foo6);
+    }
+
+    public static String tinyM = "𠝹";
+    public static String smallM = "𠝹㒨ƀ𠝹㒨ƀ𠝹㒨ƀ𠝹㒨ƀ𠝹㒨ƀ𠝹㒨ƀ𠝹㒨ƀ𠝹㒨ƀ𠝹㒨ƀ𠝹㒨ƀ𠝹㒨ƀ";
+    public static String largeM;
+    public static String tiny = "a";
+    public static String small = "adsjglhnafsjk;gujfakyhgukafshgjkahfsgjkhafs;jkhausjkgaksfj;gafskdghajfsk;g";
+    public static String large;
+
+    static {
+        StringBuilder sb = new StringBuilder();
+        while (sb.length() < 1024 * 12) {
+            sb.append(small);
+        }
+        large = sb.toString();
+
+        sb = new StringBuilder();
+        while (sb.length() < 1024 * 12) {
+            sb.append(smallM);
+        }
+        largeM = sb.toString();
+    }
+
+    @Benchmark
+    public void testMTinyStringBOS() throws IOException {
+        streamA.writeUTF(tinyM);
+    }
+
+    @Benchmark
+    public void testMTinyStringBDOSP() throws IOException {
+        streamB.writeUTF(tinyM);
+    }
+
+    @Benchmark
+    public void testMTinyLegacyWriteUTF() throws IOException {
+        BufferedDataOutputStreamTest.writeUTFLegacy(tinyM, hole);
+    }
+
+    @Benchmark
+    public void testMSmallStringBOS() throws IOException {
+        streamA.writeUTF(smallM);
+    }
+
+    @Benchmark
+    public void testMSmallStringBDOSP() throws IOException {
+        streamB.writeUTF(smallM);
+    }
+
+    @Benchmark
+    public void testMSmallLegacyWriteUTF() throws IOException {
+        BufferedDataOutputStreamTest.writeUTFLegacy(smallM, hole);
+    }
+
+    @Benchmark
+    public void testMLargeStringBOS() throws IOException {
+        streamA.writeUTF(largeM);
+    }
+
+    @Benchmark
+    public void testMLargeStringBDOSP() throws IOException {
+        streamB.writeUTF(largeM);
+    }
+
+    @Benchmark
+    public void testMLargeLegacyWriteUTF() throws IOException {
+        BufferedDataOutputStreamTest.writeUTFLegacy(largeM, hole);
+    }
+
+    @Benchmark
+    public void testTinyStringBOS() throws IOException {
+        streamA.writeUTF(tiny);
+    }
+
+    @Benchmark
+    public void testTinyStringBDOSP() throws IOException {
+        streamB.writeUTF(tiny);
+    }
+
+    @Benchmark
+    public void testTinyLegacyWriteUTF() throws IOException {
+        BufferedDataOutputStreamTest.writeUTFLegacy(tiny, hole);
+    }
+
+    @Benchmark
+    public void testSmallStringBOS() throws IOException {
+        streamA.writeUTF(small);
+    }
+
+    @Benchmark
+    public void testSmallStringBDOSP() throws IOException {
+        streamB.writeUTF(small);
+    }
+
+    @Benchmark
+    public void testSmallLegacyWriteUTF() throws IOException {
+        BufferedDataOutputStreamTest.writeUTFLegacy(small, hole);
+    }
+
+    @Benchmark
+    public void testRLargeStringBOS() throws IOException {
+        streamA.writeUTF(large);
+    }
+
+    @Benchmark
+    public void testRLargeStringBDOSP() throws IOException {
+        streamB.writeUTF(large);
+    }
+
+    @Benchmark
+    public void testRLargeLegacyWriteUTF() throws IOException {
+        BufferedDataOutputStreamTest.writeUTFLegacy(large, hole);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0352a15a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
index 8ac6d92..8eaea31 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java
@@ -3,6 +3,7 @@ package org.apache.cassandra.io.util;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.UTFDataFormatException;
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
@@ -148,7 +149,7 @@ public class BufferedDataOutputStreamTest
         int action = 0;
         while (generated.size() < 1024 * 1024 * 8)
         {
-            action = r.nextInt(18);
+            action = r.nextInt(19);
 
             //System.out.println("Action " + action + " iteration " + iteration);
             iteration++;
@@ -258,6 +259,9 @@ public class BufferedDataOutputStreamTest
             {
                 StringBuilder sb = new StringBuilder();
                 int length = r.nextInt(500);
+                //Some times do big strings
+                if (r.nextDouble() > .95)
+                    length += 4000;
                 sb.append(simple + twoByte + threeByte + fourByte);
                 for (int ii = 0; ii < length; ii++)
                 {
@@ -270,6 +274,20 @@ public class BufferedDataOutputStreamTest
             }
             case 15:
             {
+                StringBuilder sb = new StringBuilder();
+                int length = r.nextInt(500);
+                sb.append("the very model of a modern major general familiar with all things animal vegetable and mineral");
+                for (int ii = 0; ii < length; ii++)
+                {
+                    sb.append(' ');
+                }
+                String str = sb.toString();
+                writeUTFLegacy(str, dosp);
+                ndosp.writeUTF(str);
+                break;
+            }
+            case 16:
+            {
                 ByteBuffer buf = ByteBuffer.allocate(r.nextInt(1024 * 8 + 1));
                 r.nextBytes(buf.array());
                 buf.position(buf.capacity() == 0 ? 0 : r.nextInt(buf.capacity()));
@@ -281,7 +299,7 @@ public class BufferedDataOutputStreamTest
                 dosp.write(buf.duplicate());
                 break;
             }
-            case 16:
+            case 17:
             {
                 ByteBuffer buf = ByteBuffer.allocateDirect(r.nextInt(1024 * 8 + 1));
                 while (buf.hasRemaining())
@@ -295,7 +313,7 @@ public class BufferedDataOutputStreamTest
                 dosp.write(buf.duplicate());
                 break;
             }
-            case 17:
+            case 18:
             {
                 try (Memory buf = Memory.allocate(r.nextInt(1024 * 8 - 1) + 1);)
                 {
@@ -317,7 +335,7 @@ public class BufferedDataOutputStreamTest
         assertSameOutput(0, -1, iteration);
     }
 
-    static void writeUTFLegacy(String str, DataOutput out) throws IOException
+    public static void writeUTFLegacy(String str, OutputStream out) throws IOException
     {
         int utfCount = 0, length = str.length();
         for (int i = 0; i < length; i++)