You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "divijvaidya (via GitHub)" <gi...@apache.org> on 2023/02/27 17:28:41 UTC

[GitHub] [kafka] divijvaidya opened a new pull request, #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

divijvaidya opened a new pull request, #13312:
URL: https://github.com/apache/kafka/pull/13312

   # Motivation
   Reading/writing the protocol buffer varInt32 and varInt64 (also called varLong in our code base) is in the hot path of data plane code in Apache Kafka. We read multiple varInt in a record and in long. Hence, even a minor change in performance could extrapolate to larger performance benefit.
   
   # Changes
   This change uses loop unrolling and reduces the amount of repetition of calculations.
   
   # Results
   Performance has been evaluated using JMH benchmarks. The benefits of loop unrolling diminish as the size of loop increases (from Int32 to Int64)
   
   ```
   ByteUtilsBenchmark.testUnsignedReadVarintNew       thrpt    5   51187.160 ±  778.033  ops/s
   ByteUtilsBenchmark.testUnsignedReadVarintOld       thrpt    5   43441.115 ± 1281.592  ops/s
   
   ByteUtilsBenchmark.testUnsignedReadVarlongNew      thrpt    5   28293.582 ±  952.091  ops/s
   ByteUtilsBenchmark.testUnsignedReadVarlongOld      thrpt    5   22734.384 ± 1132.111  ops/s
   
   ByteUtilsBenchmark.testUnsignedWriteVarintNew      thrpt    5  134848.804 ± 2464.908  ops/s
   ByteUtilsBenchmark.testUnsignedWriteVarintOld      thrpt    5   73333.919 ± 3306.315  ops/s
   
   ByteUtilsBenchmark.testUnsignedWriteVarlongNew     thrpt    5   40252.868 ± 2551.431  ops/s
   ByteUtilsBenchmark.testUnsignedWriteVarlongOld     thrpt    5   34948.229 ± 1877.987  ops/s
   ```
   
   # Testing
   New tests have been added which validate that the value produced by prior algorithm is same as new algorithm. This validates correctness.
   
   UnitTest and IntegrationTest are successful locally.
   
   # References
   - https://steinborn.me/posts/performance/how-fast-can-you-write-a-varint/ is a nice blog which explains the optimizations for writing varInt32.
   - https://github.com/netty/netty/blob/59aa6e635b9996cf21cd946e64353270679adc73/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32FrameDecoder.java#L73 is Netty's unrolled implementation of readVarInt32
   - https://github.com/protocolbuffers/protobuf/blob/22.x/java/core/src/main/java/com/google/protobuf/CodedOutputStream.java#L1345 is ProtoBuf's loop based implementation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1454952744

   @divijvaidya The following is also interesting https://github.com/astei/varint-writing-showdown/issues/1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon merged pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon merged PR #13312:
URL: https://github.com/apache/kafka/pull/13312


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1495898660

   > There is a hollow VarInt implementation here that is similarly unrolled on the `write` path but is I think a little easier to read!
   > 
   > https://github.com/Netflix/hollow/blob/master/hollow/src/main/java/com/netflix/hollow/core/memory/encoding/VarInt.java#L51-L134
   > 
   > @dkoszewnik -- do you have an opinion on this? I'm sure you've experimented with multiple implementations.
   
   @jasonk000 Hollow's implementation is based on assumption that system is little endian but JVM defaults to big endian. Hence, it would not be compatible with Apache Kafka code directly. 
   
   
   
   > @divijvaidya The following is also interesting [astei/varint-writing-showdown#1](https://github.com/astei/varint-writing-showdown/issues/1)
   
   @ijuma I tested this but it was much slower than rest of the implementations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jasonk000 commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "jasonk000 (via GitHub)" <gi...@apache.org>.
jasonk000 commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1455212036

   There is a hollow VarInt implementation here that is similarly unrolled on the `write` path but is I think a little easier to read! 
   
   https://github.com/Netflix/hollow/blob/master/hollow/src/main/java/com/netflix/hollow/core/memory/encoding/VarInt.java#L51-L134
   
   cc @dkoszewnik -- do you have an opinion on this? I'm sure you've experimented with multiple implementations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1120217103


##########
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##########
@@ -292,29 +415,66 @@ public static double readDouble(ByteBuffer buffer) {
      * @param buffer The output to write to
      */
     public static void writeUnsignedVarint(int value, ByteBuffer buffer) {

Review Comment:
   What does netty do for this? Similar to what we're doing here or something else?



##########
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##########
@@ -150,17 +151,32 @@ public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) {
      * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read
      */
     public static int readUnsignedVarint(ByteBuffer buffer) {

Review Comment:
   Shall we have a comment indicating that we borrowed the implementation from Netty with a link to the relevant file?



##########
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##########
@@ -227,17 +259,62 @@ public static int readVarint(DataInput in) throws IOException {
      * @throws IOException              if {@link DataInput} throws {@link IOException}
      */
     public static long readVarlong(DataInput in) throws IOException {
-        long value = 0L;
-        int i = 0;
-        long b;
-        while (((b = in.readByte()) & 0x80) != 0) {
-            value |= (b & 0x7f) << i;
-            i += 7;
-            if (i > 63)
-                throw illegalVarlongException(value);
+        long raw = readUnsignedVarlong(in);
+        return (raw >>> 1) ^ -(raw & 1);
+    }
+
+    private static long readUnsignedVarlong(DataInput in) throws IOException {
+        byte tmp = in.readByte();

Review Comment:
   I saw a link to the Netty implementation for varint32, is there one for varint64 too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1515034487

   @ijuma I tried end to end benchmarks w/ and w/o this change but I didn't observe any noticeable change in end to end latency or throughput (because the bottle neck in end to end tests are someplace else). Hence, I can't prove impact of this change beyond microbenchmarking for different byte sizes. I have reverted the change for long. Please let me know if you are willing to merge this for varint32 based on microbenchmark numbers. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1182330077


##########
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java:
##########
@@ -241,6 +244,119 @@ public void testDouble() throws IOException {
         assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0000000000000L);
     }
 
+    @Test
+    public void testCorrectnessWriteUnsignedVarlong() {

Review Comment:
   Should we remove or disable this test?



##########
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java:
##########
@@ -241,6 +244,119 @@ public void testDouble() throws IOException {
         assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0000000000000L);
     }
 
+    @Test
+    public void testCorrectnessWriteUnsignedVarlong() {
+        // The old well-known implementation for writeVarlong.
+        LongFunction<ByteBuffer> simpleImplementation = (long value) -> {
+            ByteBuffer buffer = ByteBuffer.allocate(MAX_LENGTH_VARLONG);
+            while ((value & 0xffffffffffffff80L) != 0L) {
+                byte b = (byte) ((value & 0x7f) | 0x80);
+                buffer.put(b);
+                value >>>= 7;
+            }
+            buffer.put((byte) value);
+
+            return buffer;
+        };
+
+        // compare the full range of values
+        final ByteBuffer actual = ByteBuffer.allocate(MAX_LENGTH_VARLONG);
+        for (long i = 1; i < Long.MAX_VALUE && i >= 0; i = i << 1) {
+            ByteUtils.writeUnsignedVarlong(i, actual);
+            final ByteBuffer expected = simpleImplementation.apply(i);
+            assertArrayEquals(expected.array(), actual.array(), "Implementations do not match for number=" + i);
+            actual.clear();
+        }
+    }
+
+    @Test
+    public void testCorrectnessWriteUnsignedVarint() {
+        // The old well-known implementation for writeUnsignedVarint.
+        IntFunction<ByteBuffer> simpleImplementation = (int value) -> {
+            ByteBuffer buffer = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+            while (true) {
+                if ((value & ~0x7F) == 0) {
+                    buffer.put((byte) value);
+                    break;
+                } else {
+                    buffer.put((byte) ((value & 0x7F) | 0x80));
+                    value >>>= 7;
+                }
+            }
+
+            return buffer;
+        };
+
+        // compare the full range of values
+        final ByteBuffer actual = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+        for (int i = 0; i < Integer.MAX_VALUE && i >= 0; i += 13) {
+            ByteUtils.writeUnsignedVarint(i, actual);
+            final ByteBuffer expected = simpleImplementation.apply(i);
+            assertArrayEquals(expected.array(), actual.array(), "Implementations do not match for integer=" + i);
+            actual.clear();
+        }
+    }
+
+    @Test
+    public void testCorrectnessReadUnsignedVarint() {
+        // The old well-known implementation for readUnsignedVarint
+        Function<ByteBuffer, Integer> simpleImplementation = (ByteBuffer buffer) -> {
+            int value = 0;
+            int i = 0;
+            int b;
+            while (((b = buffer.get()) & 0x80) != 0) {
+                value |= (b & 0x7f) << i;
+                i += 7;
+                if (i > 28)
+                    throw new IllegalArgumentException("Invalid varint");
+            }
+            value |= b << i;
+            return value;
+        };
+
+        // compare the full range of values
+        final ByteBuffer testData = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+        for (int i = 0; i < Integer.MAX_VALUE && i >= 0; i += 13) {
+            ByteUtils.writeUnsignedVarint(i, testData);
+            // prepare buffer for reading
+            testData.flip();
+            final int actual = ByteUtils.readUnsignedVarint(testData.duplicate());
+            final int expected = simpleImplementation.apply(testData);
+            assertEquals(expected, actual);
+            testData.clear();
+        }
+    }
+
+    @Test
+    public void testCorrectnessReadUnsignedVarlong() {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1120323557


##########
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##########
@@ -150,17 +151,32 @@ public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) {
      * @throws IllegalArgumentException if variable-length value does not terminate after 5 bytes have been read
      */
     public static int readUnsignedVarint(ByteBuffer buffer) {

Review Comment:
   Sure, will do. 
   
   Also protobuf has a similar unrolled implementation for its c++ code at https://github.com/protocolbuffers/protobuf/blob/2dc5338ea222e1f4e0357e46b702ed6a0e82aaeb/src/google/protobuf/io/coded_stream.cc#L422 (it's Java variant doesn't use the unrolled implementation since they decided to add a different implementation which favours cases when varints are 1 byte).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1185666434


##########
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java:
##########
@@ -241,6 +244,119 @@ public void testDouble() throws IOException {
         assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0000000000000L);
     }
 
+    @Test
+    public void testCorrectnessWriteUnsignedVarlong() {

Review Comment:
   I agree. So maybe we can `disable` them for now, and add comment for future use like what we did [here](https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/network/SocketServerTest.scala#L1410). After all, the current varLong test is just running the same logic and verify they have the same results, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1448299260

   > It is interesting that unrolling the loop while keeping the same underlying logic yields this increase in throughput for writing varints
   
   Yes, indeed. In non hotspot code paths, which would be over-optimization and we may just want to leave it to the compiler to perform unrolling. But in hot code paths such as this PR, manual changes could lead to faster execution of cases where the compiler is not smart enough.
   
   As example of a prior change of similar nature, see: https://github.com/apache/kafka/pull/11721 (that PR claims to improve CPU by 6% in a real workload)
   
   > What CPU architectures have this been tested on?
   
   The processor architecture for the above numbers is `Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz`. I have also added number for ARM now.
   
   > Do we have any data on how these gains translate in latency reduction with real requests from canonical workloads?
   
   I haven't performed that benchmark right now. I believe that the JMH benchmark gains should be good enough justification to merge this change. But as an aside, yes, I do plan to benchmark latency reduction later when my other performance related PRs (there are some compression ones open) have also been merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1120470741


##########
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##########
@@ -227,17 +259,62 @@ public static int readVarint(DataInput in) throws IOException {
      * @throws IOException              if {@link DataInput} throws {@link IOException}
      */
     public static long readVarlong(DataInput in) throws IOException {
-        long value = 0L;
-        int i = 0;
-        long b;
-        while (((b = in.readByte()) & 0x80) != 0) {
-            value |= (b & 0x7f) << i;
-            i += 7;
-            if (i > 63)
-                throw illegalVarlongException(value);
+        long raw = readUnsignedVarlong(in);
+        return (raw >>> 1) ^ -(raw & 1);
+    }
+
+    private static long readUnsignedVarlong(DataInput in) throws IOException {
+        byte tmp = in.readByte();

Review Comment:
   That's interesting. Can we update the jmh benchmark to have variants where (1) the max varint fits within one byte and where (2) the max varint fits within two bytes. I think that is the most common by far.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1185820116


##########
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java:
##########
@@ -241,6 +244,119 @@ public void testDouble() throws IOException {
         assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0000000000000L);
     }
 
+    @Test
+    public void testCorrectnessWriteUnsignedVarlong() {

Review Comment:
   Done in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1182456902


##########
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java:
##########
@@ -241,6 +244,119 @@ public void testDouble() throws IOException {
         assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0000000000000L);
     }
 
+    @Test
+    public void testCorrectnessWriteUnsignedVarlong() {

Review Comment:
   I would be inclined to keep this test to catch any potential bugs in the future changes to the `WriteUnsignedVarlong` implementation. This test increases the testing coverage over a larger range of values than what the existing tests check for.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1536165039

   Thank you @showuon and @ijuma for your time and patience in this review 🙏


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1508353737

   @ijuma @mimaison please take a look at this PR. I would ideally like to have this part of 3.5. Note that existing tests associated with ByteUtil conversion pass and I have added new tests to add robustness here. Hence, it should be safe to merge this in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1509978997

   @divijvaidya Do we have any end to end test results? As stated previously, I'm not quite sure about the varint64 implementations - they seem to be the kind of thing that helps with microbenchmarks, but is less effective in real workloads. The varint32 ones seem fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1536156206

   Failed tests are unrelated:
   ```
       Build / JDK 8 and Scala 2.12 / org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest.testBackoffAndRetryUponRetriableError()
       Build / JDK 8 and Scala 2.12 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()
       Build / JDK 17 and Scala 2.13 / kafka.api.PlaintextAdminIntegrationTest.testAclOperations(String).quorum=zk
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1185820299


##########
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java:
##########
@@ -241,6 +244,119 @@ public void testDouble() throws IOException {
         assertDoubleSerde(Double.NEGATIVE_INFINITY, 0xFFF0000000000000L);
     }
 
+    @Test
+    public void testCorrectnessWriteUnsignedVarlong() {
+        // The old well-known implementation for writeVarlong.
+        LongFunction<ByteBuffer> simpleImplementation = (long value) -> {
+            ByteBuffer buffer = ByteBuffer.allocate(MAX_LENGTH_VARLONG);
+            while ((value & 0xffffffffffffff80L) != 0L) {
+                byte b = (byte) ((value & 0x7f) | 0x80);
+                buffer.put(b);
+                value >>>= 7;
+            }
+            buffer.put((byte) value);
+
+            return buffer;
+        };
+
+        // compare the full range of values
+        final ByteBuffer actual = ByteBuffer.allocate(MAX_LENGTH_VARLONG);
+        for (long i = 1; i < Long.MAX_VALUE && i >= 0; i = i << 1) {
+            ByteUtils.writeUnsignedVarlong(i, actual);
+            final ByteBuffer expected = simpleImplementation.apply(i);
+            assertArrayEquals(expected.array(), actual.array(), "Implementations do not match for number=" + i);
+            actual.clear();
+        }
+    }
+
+    @Test
+    public void testCorrectnessWriteUnsignedVarint() {
+        // The old well-known implementation for writeUnsignedVarint.
+        IntFunction<ByteBuffer> simpleImplementation = (int value) -> {
+            ByteBuffer buffer = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+            while (true) {
+                if ((value & ~0x7F) == 0) {
+                    buffer.put((byte) value);
+                    break;
+                } else {
+                    buffer.put((byte) ((value & 0x7F) | 0x80));
+                    value >>>= 7;
+                }
+            }
+
+            return buffer;
+        };
+
+        // compare the full range of values
+        final ByteBuffer actual = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+        for (int i = 0; i < Integer.MAX_VALUE && i >= 0; i += 13) {
+            ByteUtils.writeUnsignedVarint(i, actual);
+            final ByteBuffer expected = simpleImplementation.apply(i);
+            assertArrayEquals(expected.array(), actual.array(), "Implementations do not match for integer=" + i);
+            actual.clear();
+        }
+    }
+
+    @Test
+    public void testCorrectnessReadUnsignedVarint() {
+        // The old well-known implementation for readUnsignedVarint
+        Function<ByteBuffer, Integer> simpleImplementation = (ByteBuffer buffer) -> {
+            int value = 0;
+            int i = 0;
+            int b;
+            while (((b = buffer.get()) & 0x80) != 0) {
+                value |= (b & 0x7f) << i;
+                i += 7;
+                if (i > 28)
+                    throw new IllegalArgumentException("Invalid varint");
+            }
+            value |= b << i;
+            return value;
+        };
+
+        // compare the full range of values
+        final ByteBuffer testData = ByteBuffer.allocate(MAX_LENGTH_VARINT);
+        for (int i = 0; i < Integer.MAX_VALUE && i >= 0; i += 13) {
+            ByteUtils.writeUnsignedVarint(i, testData);
+            // prepare buffer for reading
+            testData.flip();
+            final int actual = ByteUtils.readUnsignedVarint(testData.duplicate());
+            final int expected = simpleImplementation.apply(testData);
+            assertEquals(expected, actual);
+            testData.clear();
+        }
+    }
+
+    @Test
+    public void testCorrectnessReadUnsignedVarlong() {

Review Comment:
   done in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1448306949

   Tagging a few folks who may be interested in reviewing this: @ijuma @jasonk000 @hachikuji. Kindly review when you get an opportunity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1157177692


##########
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##########
@@ -227,17 +259,62 @@ public static int readVarint(DataInput in) throws IOException {
      * @throws IOException              if {@link DataInput} throws {@link IOException}
      */
     public static long readVarlong(DataInput in) throws IOException {
-        long value = 0L;
-        int i = 0;
-        long b;
-        while (((b = in.readByte()) & 0x80) != 0) {
-            value |= (b & 0x7f) << i;
-            i += 7;
-            if (i > 63)
-                throw illegalVarlongException(value);
+        long raw = readUnsignedVarlong(in);
+        return (raw >>> 1) ^ -(raw & 1);
+    }
+
+    private static long readUnsignedVarlong(DataInput in) throws IOException {
+        byte tmp = in.readByte();

Review Comment:
   Good idea. I have added the benchmark for different sizes of varInt and varLong as per your recommendation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1160405765


##########
clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java:
##########
@@ -298,7 +414,7 @@ public void testReadInt() {
     }
 
     private void assertUnsignedVarintSerde(int value, byte[] expectedEncoding) throws IOException {
-        ByteBuffer buf = ByteBuffer.allocate(32);
+        ByteBuffer buf = ByteBuffer.allocate(MAX_LENGTH_VARINT);

Review Comment:
   Nice cleanup



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java:
##########
@@ -17,68 +17,241 @@
 
 package org.apache.kafka.jmh.util;
 
-import java.util.concurrent.ThreadLocalRandom;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.utils.ByteUtils;
 import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.CompilerControl;
 import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
 import org.openjdk.jmh.runner.Runner;
 import org.openjdk.jmh.runner.RunnerException;
 import org.openjdk.jmh.runner.options.Options;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
 
-@State(Scope.Benchmark)
-@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@OutputTimeUnit(TimeUnit.SECONDS)
 @Fork(3)
-@Warmup(iterations = 5, time = 1)
-@Measurement(iterations = 10, time = 1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
 public class ByteUtilsBenchmark {

Review Comment:
   Could we add some description for this `ByteUtilsBenchmark`, to explain what we tried to benchmark in this class, and which method we adopted in the end, and why... etc? Otherwise, other developers might get confused what we are trying to do in this test. 



##########
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##########
@@ -227,17 +268,73 @@ public static int readVarint(DataInput in) throws IOException {
      * @throws IOException              if {@link DataInput} throws {@link IOException}
      */
     public static long readVarlong(DataInput in) throws IOException {
-        long value = 0L;
-        int i = 0;
-        long b;
-        while (((b = in.readByte()) & 0x80) != 0) {
-            value |= (b & 0x7f) << i;
-            i += 7;
-            if (i > 63)
-                throw illegalVarlongException(value);
+        long raw = readUnsignedVarlong(in);
+        return (raw >>> 1) ^ -(raw & 1);
+    }
+
+    /**
+     * For implementation details see {@link #readUnsignedVarlong(ByteBuffer)}

Review Comment:
   I checked the javadoc in `#readUnsignedVarlong(ByteBuffer)`, it didn't explain any detailed implementation there. Could we copy the javadoc from `#readUnsignedVarlong(ByteBuffer)` to here? Or should we update the javadoc here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1506707381

   Rebased from trunk to fix the unrelated integration test failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1162901912


##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ByteUtilsBenchmark.java:
##########
@@ -17,68 +17,241 @@
 
 package org.apache.kafka.jmh.util;
 
-import java.util.concurrent.ThreadLocalRandom;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.utils.ByteUtils;
 import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.CompilerControl;
 import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
 import org.openjdk.jmh.runner.Runner;
 import org.openjdk.jmh.runner.RunnerException;
 import org.openjdk.jmh.runner.options.Options;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
 
-@State(Scope.Benchmark)
-@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@OutputTimeUnit(TimeUnit.SECONDS)
 @Fork(3)
-@Warmup(iterations = 5, time = 1)
-@Measurement(iterations = 10, time = 1)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
 public class ByteUtilsBenchmark {

Review Comment:
   Done in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1162915231


##########
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##########
@@ -227,17 +268,73 @@ public static int readVarint(DataInput in) throws IOException {
      * @throws IOException              if {@link DataInput} throws {@link IOException}
      */
     public static long readVarlong(DataInput in) throws IOException {
-        long value = 0L;
-        int i = 0;
-        long b;
-        while (((b = in.readByte()) & 0x80) != 0) {
-            value |= (b & 0x7f) << i;
-            i += 7;
-            if (i > 63)
-                throw illegalVarlongException(value);
+        long raw = readUnsignedVarlong(in);
+        return (raw >>> 1) ^ -(raw & 1);
+    }
+
+    /**
+     * For implementation details see {@link #readUnsignedVarlong(ByteBuffer)}

Review Comment:
   done in latest commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1523106081

   @ijuma this is ready for your review. All failing tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1120354148


##########
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##########
@@ -292,29 +415,66 @@ public static double readDouble(ByteBuffer buffer) {
      * @param buffer The output to write to
      */
     public static void writeUnsignedVarint(int value, ByteBuffer buffer) {

Review Comment:
   Netty uses the default loop based implementation: https://github.com/netty/netty/blob/5d1f99655918c9c034ca090d51b64eced73f742f/codec/src/main/java/io/netty/handler/codec/protobuf/ProtobufVarint32LengthFieldPrepender.java#L58 
   
   I wasn't able to find a project which does what we are doing [except for the blog post](https://steinborn.me/posts/performance/how-fast-can-you-write-a-varint/) I mentioned above and over [here](https://github.com/richardstartin/varints/blob/main/src/main/java/io/github/richardstartin/varints/SmartNoDataDependencyVarIntState.java).
   
   Note that protobuf uses unrolled implementation for it's c++ code at 
   https://github.com/protocolbuffers/protobuf/blob/2dc5338ea222e1f4e0357e46b702ed6a0e82aaeb/src/google/protobuf/io/coded_stream.h#L913 Not that it matters for us since compilers are different from Java & c++ but adding it here as a data point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on PR #13312:
URL: https://github.com/apache/kafka/pull/13312#issuecomment-1447962623

   Thanks for the PR. It is interesting that unrolling the loop while keeping the same underlying logic yields this increase in throughput for writing varints. A few questions:
   
   - What CPU architectures have this been tested on? 
   - Do we have any data on how these gains translate in latency reduction with real requests from canonical workloads?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13312: KAFKA-14766: Improve performance of VarInt encoding and decoding

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #13312:
URL: https://github.com/apache/kafka/pull/13312#discussion_r1120328865


##########
clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java:
##########
@@ -227,17 +259,62 @@ public static int readVarint(DataInput in) throws IOException {
      * @throws IOException              if {@link DataInput} throws {@link IOException}
      */
     public static long readVarlong(DataInput in) throws IOException {
-        long value = 0L;
-        int i = 0;
-        long b;
-        while (((b = in.readByte()) & 0x80) != 0) {
-            value |= (b & 0x7f) << i;
-            i += 7;
-            if (i > 63)
-                throw illegalVarlongException(value);
+        long raw = readUnsignedVarlong(in);
+        return (raw >>> 1) ^ -(raw & 1);
+    }
+
+    private static long readUnsignedVarlong(DataInput in) throws IOException {
+        byte tmp = in.readByte();

Review Comment:
   No, I have written this by extending Netty's varint32 implementation to work with 64 bits (it's simple loop unrolling, no fancy logic). The heuristics of inlining starts becoming unclearer as we increase the size of function, hence, we don't see much benefit for 64 bit implementation here. I don't have a strong opinion on 64 bit implementation here and would be happy to fall back to exact implementation as Protobuf. [1]
   
   
   [1] https://github.com/protocolbuffers/protobuf/blob/main/java/core/src/main/java/com/google/protobuf/CodedInputStream.java#L1048



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org