You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/25 15:55:36 UTC

[GitHub] [kafka] LinShunKang opened a new pull request, #12685: KIP-872: Add Serializer#serializeToByteBuffer() to reduce memory copying

LinShunKang opened a new pull request, #12685:
URL: https://github.com/apache/kafka/pull/12685

   Implementation of [KIP-872](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250526734


##########
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java:
##########
@@ -407,19 +411,66 @@ private Serde<String> getStringSerde(String encoder) {
     }
 
     @Test
-    public void testByteBufferSerializer() {
+    public void testByteBufferSerCompatibility() {
         final byte[] bytes = "Hello".getBytes(UTF_8);
         final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes);
         final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes);
         final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+        final ByteBuffer heapBuffer3 = heapBuffer0.duplicate();

Review Comment:
   > Source ByteBuffer has an underlying array but it is shared with other byte buffers (e.g. this source bytebuffer is carved out using slice)
   
   This test case is not covered here. 
   You can use something like:
   ```
   val garbageBytes = "garbage bytes".getBytes(Charset.defaultCharset())
   val extraBytes = "extra bytes".getBytes(Charset.defaultCharset())
   val bigReadBuffer = ByteBuffer.allocate(channel.size().toInt + garbageBytes.length + extraBytes.length)
   bigReadBuffer.put(extraBytes)
   val readBuffer = bigReadBuffer.slice()
   serializer.serialize(topic, readBuffer)
   ```
   
   The difference with using duplicate() is that duplicate() retains the original position wrt to the underlying array. Using duplicate() won't catch the bugs where some code makes an incorrect assumption that underlying array offset 0 is the same as pos=0. In duplicate(), it is true but for slice() it may not be true.



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+    @Param({"128", "256"})
+    private int bytes;
+
+    @Param({"false", "true"})
+    private boolean direct;
+
+    @Param({"false", "true"})
+    private boolean readonly;
+
+    private ByteBuffer byteBuffer;
+
+    private byte[] bytesArray;
+
+    @Setup
+    public void setup() {
+        final SplittableRandom random = new SplittableRandom();

Review Comment:
   nit
   To get consistent results across benchmark runs, it is preferred to seed the random number with a constant value. 



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+    @Param({"128", "256"})
+    private int bytes;
+
+    @Param({"false", "true"})
+    private boolean direct;
+
+    @Param({"false", "true"})
+    private boolean readonly;
+
+    private ByteBuffer byteBuffer;
+
+    private byte[] bytesArray;
+
+    @Setup
+    public void setup() {
+        final SplittableRandom random = new SplittableRandom();
+        bytesArray = new byte[bytes];
+        byteBuffer = direct ? ByteBuffer.allocateDirect(bytes) : ByteBuffer.allocate(bytes);
+        for (int i = 0; i < bytes; i++) {
+            final byte b = (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE + 1);
+            byteBuffer.put(i, b);
+            bytesArray[i] = b;
+        }
+
+        if (readonly) {
+            byteBuffer = byteBuffer.asReadOnlyBuffer();
+        }
+    }
+
+    @Benchmark
+    public int byteBufferMurmur2() {
+        return Utils.murmur2(byteBuffer);

Review Comment:
   It is preferred to use blackhole [1] to consume the returned value. If we don't, then we risk the probability of compiler optimising this out thinking that the result is never used.
   
   [1] https://github.com/openjdk/jmh/blob/master/jmh-samples/src/main/java/org/openjdk/jmh/samples/JMHSample_09_Blackholes.java



##########
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java:
##########
@@ -407,19 +411,66 @@ private Serde<String> getStringSerde(String encoder) {
     }
 
     @Test
-    public void testByteBufferSerializer() {
+    public void testByteBufferSerCompatibility() {
         final byte[] bytes = "Hello".getBytes(UTF_8);
         final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes);
         final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes);
         final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+        final ByteBuffer heapBuffer3 = heapBuffer0.duplicate();
+        final ByteBuffer heapBuffer4 = heapBuffer1.duplicate();
+        final ByteBuffer heapBuffer5 = heapBuffer2.duplicate();
+        final ByteBuffer heapBuffer6 = heapBuffer0.asReadOnlyBuffer();
+        final ByteBuffer heapBuffer7 = heapBuffer1.asReadOnlyBuffer();
+        final ByteBuffer heapBuffer8 = heapBuffer2.asReadOnlyBuffer();
         final ByteBuffer directBuffer0 = ByteBuffer.allocateDirect(bytes.length + 1).put(bytes);
         final ByteBuffer directBuffer1 = ByteBuffer.allocateDirect(bytes.length).put(bytes);
         try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
-            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0));
-            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1));
-            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2));
-            assertArrayEquals(bytes, serializer.serialize(topic, directBuffer0));
-            assertArrayEquals(bytes, serializer.serialize(topic, directBuffer1));
+            assertNull(serializer.serialize(topic, null));
+            assertNull(serializer.serializeToByteBuffer(topic, null));
+            assertArrayEquals(new byte[0], serializer.serialize(topic, ByteBuffer.allocate(0)));
+
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer0);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer1);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer2);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer3);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer4);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer5);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer6);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer7);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer8);
+
+            testByteBufferSerCompatibility0(serializer, bytes, directBuffer0);
+            testByteBufferSerCompatibility0(serializer, bytes, directBuffer1);
+        }
+    }
+
+    private void testByteBufferSerCompatibility0(ByteBufferSerializer serializer,
+                                                 byte[] expectedBytes,
+                                                 ByteBuffer buffer) {
+        final ByteBuffer duplicatedBuf0 = buffer.duplicate();
+        final ByteBuffer duplicatedBuf1 = buffer.duplicate();
+        assertEquals(duplicatedBuf0, duplicatedBuf1);

Review Comment:
   Isn't this assertion unnecessary? We just duplicated the buffer in the above two lines. With this assertion we are actually testing whether duplicate() works correct or not! We don't need to test that.



##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {
+        final int length = data.remaining();
+        final int seed = 0x9747b28c;
+        // 'm' and 'r' are mixing constants generated offline.
+        // They're not really 'magic', they just happen to work well.
+        final int m = 0x5bd1e995;
+        final int r = 24;
+
+        // Initialize the hash to a random value
+        int h = seed ^ length;
+        final int length4 = length / 4;
+        data = data.order() == LITTLE_ENDIAN ? data : data.slice().order(LITTLE_ENDIAN);

Review Comment:
   I understand that you are doing this because murmur2 works on little endian order. Please add a comment explaining it here.
   
   However (and correct me if I am wrong), this line says, "if the current buffer does not interpret the underlying ordering of data as little endian, then create a new buffer which will interpret the underlying data as little endian". This logic seems incorrect to be because what if the underlying data is in big endian format. Using a bytebuffer to forcefully view it in little endian won't change the underlying data. 
   
   Alternatively, let's do two things:
   1. If the incoming ByteBuffer in Serializer is not in little endian, then we need to convert the underlying data  into little endian (and yes that will be a data copy).
   2. Remove this statement from here and instead replace with a check to throw an exception if it receives a bytebuffer configured as little endian. 



##########
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java:
##########
@@ -407,19 +411,66 @@ private Serde<String> getStringSerde(String encoder) {
     }
 
     @Test
-    public void testByteBufferSerializer() {
+    public void testByteBufferSerCompatibility() {
         final byte[] bytes = "Hello".getBytes(UTF_8);
         final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes);
         final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes);
         final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+        final ByteBuffer heapBuffer3 = heapBuffer0.duplicate();
+        final ByteBuffer heapBuffer4 = heapBuffer1.duplicate();

Review Comment:
   We should use `@Parameterized` here for all these different parameters that we are testing.



##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {

Review Comment:
   Seems like we have a 13% regression for non-direct buffer cases and similar improvement in direct buffer cases. The additional computation we are performing for non-direct buffer is `data.arrayOffset() + data.position()` which is done once (hopefully, compiler is intelligent enough to that). Could we perhaps take it out of the loop just to be double sure.
   
   Any other thoughts on why do we have a perf difference between direct vs non-direct buffer here? I expect a performance difference between them when GC is involved but in this case we are not creating more objects (except when calling slice()). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250943138


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -321,13 +322,20 @@ public int partition() {
         }
     }
 
-    /*
+    /**
      * Default hashing function to choose a partition from the serialized key bytes
      */
     public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
         return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
     }
 
+    /**
+     * Default hashing function to choose a partition from the serialized key bytes
+     */
+    public static int partitionForKey(final ByteBuffer serializedKey, final int numPartitions) {
+        return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250475769


##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
         data.flip();
         return Utils.toArray(data);
     }
+
+    /**
+     * Note that this method will modify the position and limit of the input ByteBuffer.
+     *
+     * @param topic   topic associated with data
+     * @param data    typed data
+     * @return serialized ByteBuffer
+     */
+    @Override
+    public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {

Review Comment:
   ah - my bad, I meant to say  limit = capacity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1257448258


##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+    @Param({"128", "256"})
+    private int bytes;
+
+    @Param({"false", "true"})
+    private boolean direct;
+
+    @Param({"false", "true"})
+    private boolean readonly;
+
+    private ByteBuffer byteBuffer;
+
+    private byte[] bytesArray;
+
+    @Setup
+    public void setup() {
+        final SplittableRandom random = new SplittableRandom();

Review Comment:
   After specifying a fixed seed for `SplittableRandom`:
   ```
   Benchmark                              (bytes)  (direct)  (readonly)  (seed)   Mode  Cnt      Score     Error   Units
   MurMurHashBenchmark.byteArrayMurmur2       128     false       false      42  thrpt   10  23981.666 ± 121.983  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128     false        true      42  thrpt   10  23922.919 ± 154.278  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128      true       false      42  thrpt   10  23953.261 ± 141.848  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128      true        true      42  thrpt   10  24006.351 ± 111.603  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256     false       false      42  thrpt   10  12331.088 ±  71.559  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256     false        true      42  thrpt   10  12314.595 ±  59.174  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256      true       false      42  thrpt   10  12308.788 ±  76.674  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256      true        true      42  thrpt   10  12310.486 ±  73.764  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128     false       false      42  thrpt   10  20567.501 ± 152.677  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128     false        true      42  thrpt   10  20647.973 ± 155.602  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128      true       false      42  thrpt   10  26602.151 ± 478.886  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128      true        true      42  thrpt   10  26974.506 ± 227.064  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256     false       false      42  thrpt   10  11296.852 ±  50.338  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256     false        true      42  thrpt   10  11013.108 ± 103.543  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256      true       false      42  thrpt   10  14734.871 ± 156.346  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256      true        true      42  thrpt   10  14710.254 ± 182.270  ops/ms
   ```



##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+    @Param({"128", "256"})
+    private int bytes;
+
+    @Param({"false", "true"})
+    private boolean direct;
+
+    @Param({"false", "true"})
+    private boolean readonly;
+
+    private ByteBuffer byteBuffer;
+
+    private byte[] bytesArray;
+
+    @Setup
+    public void setup() {
+        final SplittableRandom random = new SplittableRandom();

Review Comment:
   After specifying a fixed seed for `SplittableRandom`:
   ```
   Benchmark                              (bytes)  (direct)  (readonly)  (seed)   Mode  Cnt      Score     Error   Units
   MurMurHashBenchmark.byteArrayMurmur2       128     false       false      42  thrpt   10  23981.666 ± 121.983  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128     false        true      42  thrpt   10  23922.919 ± 154.278  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128      true       false      42  thrpt   10  23953.261 ± 141.848  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128      true        true      42  thrpt   10  24006.351 ± 111.603  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256     false       false      42  thrpt   10  12331.088 ±  71.559  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256     false        true      42  thrpt   10  12314.595 ±  59.174  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256      true       false      42  thrpt   10  12308.788 ±  76.674  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256      true        true      42  thrpt   10  12310.486 ±  73.764  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128     false       false      42  thrpt   10  20567.501 ± 152.677  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128     false        true      42  thrpt   10  20647.973 ± 155.602  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128      true       false      42  thrpt   10  26602.151 ± 478.886  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128      true        true      42  thrpt   10  26974.506 ± 227.064  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256     false       false      42  thrpt   10  11296.852 ±  50.338  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256     false        true      42  thrpt   10  11013.108 ± 103.543  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256      true       false      42  thrpt   10  14734.871 ± 156.346  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256      true        true      42  thrpt   10  14710.254 ± 182.270  ops/ms
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1602048965

   Hi, @divijvaidya @showuon @dengziming @vvcephei 
   KIP-872 has PASSED, I've submitted my code implementation for KIP-872. It would be great if you could spare some time to review it. Your insights and suggestions are valuable. Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1264484155


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {
+        final int length = data.remaining();
+        final int seed = 0x9747b28c;
+        // 'm' and 'r' are mixing constants generated offline.
+        // They're not really 'magic', they just happen to work well.
+        final int m = 0x5bd1e995;
+        final int r = 24;
+
+        // Initialize the hash to a random value
+        int h = seed ^ length;
+        final int length4 = length / 4;
+        data = data.order() == LITTLE_ENDIAN ? data : data.slice().order(LITTLE_ENDIAN);

Review Comment:
   > > I don't think it's necessary to do that
   > 
   > can you help me understand how this works? I am concerned about the fact that murmur2 requires little endian order but incoming data could be in big endian format.
   
   We can approach the problem in the following way: as long as the value of variable `k` obtained in both `Utils#murmur2(ByteBuffer)` and `Utils#murmur2(byte[])` is consistent, we can ensure that the results calculated by `Utils#murmur2(ByteBuffer)` and `Utils#murmur2(byte[])` are the same. 
   
   Next, I will use a simple example to explain why it doesn't affect the correctness of `Utils#murmur2(ByteBuffer)`whether the `ByteBuffer` passed by the user is `BIG_ENDIAN` or `LITTLE_ENDIAN` order.
   
   ```
   final ByteBuffer heapBuffer = ByteBuffer.allocate(4).order(BIG_ENDIAN);
   heapBuffer.putInt(1234567890); // [73, -106, 2, -46]
   heapBuffer.flip();
   
   // [73, -106, 2, -46]
   final byte[] bytes = Utils.toArray(heapBuffer); 
   
   // obtain variable k from bytes just like Utils#murmur2(byte[])
   final int k0 = (bytes[0] & 0xff) + ((bytes[1] & 0xff) << 8) + ((bytes[2] & 0xff) << 16) + ((bytes[3] & 0xff) << 24);
   
   // obtain variable k from bytes just like Utils#murmur2(ByteBuffer)
   final int k1 = heapBuffer.slice().order(LITTLE_ENDIAN).getInt(0);
   
   // It will print `k0=-771582391, k1=-771582391`
   System.out.println("k0=" + k0 + ", k1=" + k1);
   ```
   
   NOTE: `Utils#murmur2(ByteBuffer)` does not require the `ByteBuffer` passed by the user to be in `LITTLE_ENDIAN` order. `Utils#murmur2(ByteBuffer)` simply reads the data from the `ByteBuffer` in `LITTLE_ENDIAN` order.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying [kafka]

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1925307450

   > @LinShunKang It looks like you need to rebase this PR to resolve conflicts. Ping @divijvaidya @showuon @dengziming, can you help get this merged? Thanks
   
   @mimaison I would like to resolve conflicts, but due to the presence of [#14617](https://github.com/apache/kafka/pull/14617), I don't know how to implement `ByteBufferSerializer#serializeToByteBuffer(String, ByteBuffer)`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1191281659


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1383,9 +1386,13 @@ private ClusterResourceListeners configureClusterResourceListeners(Serializer<K>
      * can be used (the partition is then calculated by built-in
      * partitioning logic).
      */
-    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {

Review Comment:
   Lest my eyes deceive me, this is just formatting, correct?



##########
clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java:
##########
@@ -31,13 +34,27 @@ public interface Partitioner extends Configurable, Closeable {
      *
      * @param topic The topic name
      * @param key The key to partition on (or null if no key)
-     * @param keyBytes The serialized key to partition on( or null if no key)
+     * @param keyBytes The serialized key to partition on(or null if no key)

Review Comment:
   nit: Can we change:
   
   `The serialized key to partition on(or null if no key)`
   
   to
   
   `The serialized key to partition on (or null if no key)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1192944064


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1383,9 +1386,13 @@ private ClusterResourceListeners configureClusterResourceListeners(Serializer<K>
      * can be used (the partition is then calculated by built-in
      * partitioning logic).
      */
-    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {

Review Comment:
   Yes, I have already rolled back this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250942671


##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+    @Param({"128", "256"})
+    private int bytes;
+
+    @Param({"false", "true"})
+    private boolean direct;
+
+    @Param({"false", "true"})
+    private boolean readonly;
+
+    private ByteBuffer byteBuffer;
+
+    private byte[] bytesArray;
+
+    @Setup
+    public void setup() {
+        final SplittableRandom random = new SplittableRandom();
+        bytesArray = new byte[bytes];
+        byteBuffer = direct ? ByteBuffer.allocateDirect(bytes) : ByteBuffer.allocate(bytes);
+        for (int i = 0; i < bytes; i++) {
+            final byte b = (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE + 1);
+            byteBuffer.put(i, b);
+            bytesArray[i] = b;
+        }
+
+        if (readonly) {
+            byteBuffer = byteBuffer.asReadOnlyBuffer();
+        }
+    }
+
+    @Benchmark
+    public int byteBufferMurmur2() {
+        return Utils.murmur2(byteBuffer);

Review Comment:
   ah, thanks! I learnt something new today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1616054098

   > Add @Deprecated annotation for Utils#murmur2(byte[])
   
   We should anyways start using murmur3 starting Kafka 4.x. Also perhaps the endia-ness neutral version of it otherwise we risk having different architectures/clients sending partitions differently. If you are interested, a KIP would be required to move to murmur3 and with that KIP we can mark murmur2 completely deprecated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1249235892


##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
         data.flip();
         return Utils.toArray(data);
     }
+
+    /**
+     * Note that this method will modify the position and limit of the input ByteBuffer.
+     *
+     * @param topic   topic associated with data
+     * @param data    typed data
+     * @return serialized ByteBuffer
+     */
+    @Override
+    public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {

Review Comment:
   I have a little confusion:
   2. In ByteBuffer `remaining = limit - position`, if `position` > 0 then `remaining` must lower than `limit`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250943986


##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
         data.flip();
         return Utils.toArray(data);
     }
+
+    /**
+     * Note that this method will modify the position and limit of the input ByteBuffer.
+     *
+     * @param topic   topic associated with data
+     * @param data    typed data
+     * @return serialized ByteBuffer
+     */
+    @Override
+    public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {

Review Comment:
   No, actually number 1 is still missing. See my comment at https://github.com/apache/kafka/pull/12685#discussion_r1250526734



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1257492943


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {
+        final int length = data.remaining();
+        final int seed = 0x9747b28c;
+        // 'm' and 'r' are mixing constants generated offline.
+        // They're not really 'magic', they just happen to work well.
+        final int m = 0x5bd1e995;
+        final int r = 24;
+
+        // Initialize the hash to a random value
+        int h = seed ^ length;
+        final int length4 = length / 4;
+        data = data.order() == LITTLE_ENDIAN ? data : data.slice().order(LITTLE_ENDIAN);

Review Comment:
   I don't think it's necessary to do that. From this [commit](https://github.com/apache/kafka/pull/12685/commits/721db38b0580250a122f7dd178172ddec616cbe6), it seems that this change does not have any compatibility issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1271319659


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {

Review Comment:
   @divijvaidya 
   I optimized `Utils#murmur2(ByteBuffer)` through this [commit](https://github.com/apache/kafka/pull/12685/commits/3dec26dadf60c43e53ff8b13ef83206aabf82637), and here are the results:
   ```
   Benchmark                              (bytes)  (direct)  (littleEndian)  (seed)   Mode  Cnt      Score     Error   Units
   MurMurHashBenchmark.byteArrayMurmur2       128     false           false      42  thrpt   10  23952.036 ± 234.049  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128     false            true      42  thrpt   10  24006.164 ± 153.647  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128      true           false      42  thrpt   10  23997.223 ± 155.545  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128      true            true      42  thrpt   10  24014.765 ± 169.769  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256     false           false      42  thrpt   10  12314.915 ±  71.558  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256     false            true      42  thrpt   10  12312.495 ±  87.004  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256      true           false      42  thrpt   10  12299.405 ±  85.441  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256      true            true      42  thrpt   10  12289.485 ± 107.814  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128     false           false      42  thrpt   10  23436.309 ± 169.624  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128     false            true      42  thrpt   10  23436.878 ±  93.588  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128      true           false      42  thrpt   10  25995.246 ± 582.841  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128      true            true      42  thrpt   10  28573.727 ± 155.530  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256     false           false      42  thrpt   10  12210.036 ±  69.601  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256     false            true      42  thrpt   10  12184.773 ±  98.403  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256      true           false      42  thrpt   10  14500.743 ± 196.826  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256      true            true      42  thrpt   10  15421.734 ± 113.717  ops/ms
   ```
   
   After the optimization, the throughput of `Utils#murmur2(ByteBuffer)` in `HeapByteBuffer` **decreased** by only **2%** compared to `Utils#murmur2(byte[])`, while in `DirectByteBuffer`, the throughput **increased** by **8%**.



##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {

Review Comment:
   @divijvaidya 
   I optimized `Utils#murmur2(ByteBuffer)` through this [commit](https://github.com/apache/kafka/pull/12685/commits/3dec26dadf60c43e53ff8b13ef83206aabf82637), and here are the results:
   ```
   Benchmark                              (bytes)  (direct)  (littleEndian)  (seed)   Mode  Cnt      Score     Error   Units
   MurMurHashBenchmark.byteArrayMurmur2       128     false           false      42  thrpt   10  23952.036 ± 234.049  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128     false            true      42  thrpt   10  24006.164 ± 153.647  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128      true           false      42  thrpt   10  23997.223 ± 155.545  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128      true            true      42  thrpt   10  24014.765 ± 169.769  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256     false           false      42  thrpt   10  12314.915 ±  71.558  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256     false            true      42  thrpt   10  12312.495 ±  87.004  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256      true           false      42  thrpt   10  12299.405 ±  85.441  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256      true            true      42  thrpt   10  12289.485 ± 107.814  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128     false           false      42  thrpt   10  23436.309 ± 169.624  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128     false            true      42  thrpt   10  23436.878 ±  93.588  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128      true           false      42  thrpt   10  25995.246 ± 582.841  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128      true            true      42  thrpt   10  28573.727 ± 155.530  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256     false           false      42  thrpt   10  12210.036 ±  69.601  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256     false            true      42  thrpt   10  12184.773 ±  98.403  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256      true           false      42  thrpt   10  14500.743 ± 196.826  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256      true            true      42  thrpt   10  15421.734 ± 113.717  ops/ms
   ```
   
   After the optimization, the throughput of `Utils#murmur2(ByteBuffer)` in `HeapByteBuffer` **decreased** by only **2%** compared to `Utils#murmur2(byte[])`, while in `DirectByteBuffer`, the throughput **increased** by **8%**.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1616603443

   > > Add @deprecated annotation for Utils#murmur2(byte[])
   > 
   > We should anyways start using murmur3 starting Kafka 4.x. Also perhaps the endia-ness neutral version of it otherwise we risk having different architectures/clients sending partitions differently. If you are interested, a KIP would be required to move to murmur3 and with that KIP we can mark murmur2 completely deprecated.
   
   1. When I add the @Deprecated annotation to Utils#murmur2(byte[]), the test cases will result in an error when executed.
   2. I am interested in switching to murmur3, but the decision of whether to use a byte-order-independent algorithm may require further discussion. Considering that there are Kafka clients in languages other than Java, we need an algorithm that is independent of language and computer architecture.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250947571


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {

Review Comment:
   That's fair. Prior to this change byte[] would also have been stored contiguously hence reading 4 bytes would have been faster whereas HeapByteBuffer may perform random seeks to read the entire int. 
   
   It's fine since I don't see how we can potentially avoid this but note that this may offset gains that we are getting by removing the data copy in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1257483057


##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
         data.flip();
         return Utils.toArray(data);
     }
+
+    /**
+     * Note that this method will modify the position and limit of the input ByteBuffer.
+     *
+     * @param topic   topic associated with data
+     * @param data    typed data
+     * @return serialized ByteBuffer
+     */
+    @Override
+    public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {

Review Comment:
   > No, actually number 1 is still missing. See my comment at [#12685 (comment)](https://github.com/apache/kafka/pull/12685#discussion_r1250526734)
   
   The number 1 case has been covered: https://github.com/apache/kafka/pull/12685/commits/85d98090bf5433bc54f91c573dbc3f4a48d77ebc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1632034547

   Hello @showuon @dengziming @vvcephei , I would like to kindly request your code review for my recent contribution related to KIP-872. Your expertise and feedback would be highly appreciated. Thank you!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250933347


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {

Review Comment:
   > Seems like we have a 13% regression for non-direct buffer cases and similar improvement in direct buffer cases. The additional computation we are performing for non-direct buffer is `data.arrayOffset() + data.position()` which is done once (hopefully, compiler is intelligent enough to that). Could we perhaps take it out of the loop just to be double sure.
   > 
   > Any other thoughts on why do we have a perf difference between direct vs non-direct buffer here? I expect a performance difference between them when GC is involved but in this case we are not creating more objects (except when calling slice()).
   
   I believe that `data.arrayOffset() + data.position()` is not the key issue, but rather `ByteBuffer#getInt(int)`:
   * In aarch64/x86_64/x86/ppc64le/ppc64/i386 architectures, `DirectByteBuffer` uses `Unsafe#getInt(int)` to retrieve 4 bytes in one operation.
   * No matter what architectures, `HeapByteBuffer` uses `ByteBuffer#_get(int)` to  retrieve 4 bytes in four operations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1249312027


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -321,13 +322,20 @@ public int partition() {
         }
     }
 
-    /*
+    /**
      * Default hashing function to choose a partition from the serialized key bytes
      */
     public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
         return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
     }
 
+    /**
+     * Default hashing function to choose a partition from the serialized key bytes
+     */
+    public static int partitionForKey(final ByteBuffer serializedKey, final int numPartitions) {
+        return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;

Review Comment:
   `Utils#murmur2(ByteBuffer)` does not change the offsets of the input `ByteBuffer` because `Utils#murmur2(ByteBuffer)` only uses the `ByteBuffer#get(int)` and `ByteBuffer#getInt(int)` methods to read data from the `ByteBuffer`, and these methods do not change the offsets of the `ByteBuffer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1249416755


##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
         data.flip();
         return Utils.toArray(data);
     }
+
+    /**
+     * Note that this method will modify the position and limit of the input ByteBuffer.
+     *
+     * @param topic   topic associated with data
+     * @param data    typed data
+     * @return serialized ByteBuffer
+     */
+    @Override
+    public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {

Review Comment:
   All test cases have been added except for the second one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1607609145

   @LinShunKang Give me a few days. I will get to it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1257491896


##########
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java:
##########
@@ -407,19 +411,66 @@ private Serde<String> getStringSerde(String encoder) {
     }
 
     @Test
-    public void testByteBufferSerializer() {
+    public void testByteBufferSerCompatibility() {
         final byte[] bytes = "Hello".getBytes(UTF_8);
         final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes);
         final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes);
         final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+        final ByteBuffer heapBuffer3 = heapBuffer0.duplicate();
+        final ByteBuffer heapBuffer4 = heapBuffer1.duplicate();

Review Comment:
   OK, done, see https://github.com/apache/kafka/commit/721db38b0580250a122f7dd178172ddec616cbe6



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying [kafka]

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1862895717

   @divijvaidya @showuon @dengziming You voted on the KIP ([KIP-872](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828)), can you help this PR progress? It has been opened for over a year! Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1546596937

   > LGTM overall. I wish the murmur code was somehow generalized so that there wasn't two versions of it. We'll just have to be careful that it doesn't diverge.
   
   Thank you for your reviewing!
   
   Utils#murmur2(byte[]) and Utils#murmur2(ByteBuffer) are only used for Partitioners, so we can:
   1. Modify all Partitioners to use Utils#murmur2(ByteBuffer)
   2. Add @Deprecated annotation for Utils#murmur2(byte[])
   3. Invoke Utils#murmur2(ByteBuffer.wrap(byte[])) in Utils#murmur2(byte[])
   
   In order to accomplish the first step mentioned above, we should add a method called WindowedSerializer#serializeBaseKeyToByteBuffer(String topic, Windowed data) and utilize this method in the WindowedStreamPartitioner#partition() function.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250918669


##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
         data.flip();
         return Utils.toArray(data);
     }
+
+    /**
+     * Note that this method will modify the position and limit of the input ByteBuffer.
+     *
+     * @param topic   topic associated with data
+     * @param data    typed data
+     * @return serialized ByteBuffer
+     */
+    @Override
+    public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {

Review Comment:
   > ah - my bad, I meant to say limit = capacity
   
   OK, then all test cases have been covered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1248924148


##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
         data.flip();
         return Utils.toArray(data);
     }
+
+    /**
+     * Note that this method will modify the position and limit of the input ByteBuffer.
+     *
+     * @param topic   topic associated with data
+     * @param data    typed data
+     * @return serialized ByteBuffer
+     */
+    @Override
+    public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {

Review Comment:
   Please add the following cases in the unit test to validate the same behaviour before and after this PR:
   1. Source ByteBuffer has an underlying array but it is shared with other byte buffers (e.g. this source bytebuffer is carved out using slice)
   2. Source ByteBuffer has a non-zero position and limit = remaining.
   3. Source ByteBuffer has a non zero position and limit = position.
   4. Source Bytebuffer is backed by a direct buffer.
   5. Incoming Bytebuffer is a read-only buffer.
   



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -321,13 +322,20 @@ public int partition() {
         }
     }
 
-    /*
+    /**
      * Default hashing function to choose a partition from the serialized key bytes
      */
     public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
         return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
     }
 
+    /**
+     * Default hashing function to choose a partition from the serialized key bytes
+     */
+    public static int partitionForKey(final ByteBuffer serializedKey, final int numPartitions) {
+        return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;

Review Comment:
   murmur2() will change the position of the bytebuffer and (correct me if I am wrong) we want to read it again to actually get the key (using remaining()). Wouldn't we end up reading the wrong key since position has already been changed by this method?



##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {

Review Comment:
   This is a performance sensitive code. Can we have a microbenchmark to ensure that we haven't regressed here? You can use https://github.com/sangupta/murmur/blob/master/src/test/java/com/sangupta/murmur/MurmurPerformanceTests.java 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying [kafka]

Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1923787527

   @LinShunKang It looks like you need to rebase this PR to resolve conflicts. 
   Ping @divijvaidya @showuon @dengziming, can you help get this merged? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying [kafka]

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1926319143

   @LinShunKang , sorry for being late. I had a quick look at https://github.com/apache/kafka/pull/14617, it looks like the `ByteBufferSerializer#serialize` is a public API and cannot be changed without KIP. You know more than I do, so, in your opinion, what should we do from now? Could we keep the `ByteBufferSerializer#serialize` and implement `ByteBufferSerializer#serializeToByteBuffer`? Do you think we should include people in this [PR](https://github.com/apache/kafka/pull/14617) to this PR for discussion? Or do you have any other thoughts? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying [kafka]

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1929526582

   > @LinShunKang , sorry for being late. I had a quick look at #14617, it looks like the `ByteBufferSerializer#serialize` is a public API and cannot be changed without KIP. You know more than I do, so, in your opinion, what should we do from now? Could we not touch the `ByteBufferSerializer#serialize` and only implement `ByteBufferSerializer#serializeToByteBuffer`? Do you think we should include people in this [PR](https://github.com/apache/kafka/pull/14617) to this PR for discussion? Or do you have any other thoughts?
   
   We could not only implement `ByteBufferSerializer#serializeToByteBuffer` because if `Serializer` implements this method, then the `Serializer#serialize` will never be called. And `ByteBufferSerializer#serialize` has obvious logical problems.
   
   I believe we should address the logical issues in `ByteBufferSerializer#serialize`, but this will introduce breaking changes for existing users. And the `Serializer` should not have both `serialize` and `serializeToByteBuffer` methods at the same time. Therefore, I suggest we tackle these issues in Kafka 4.0, where we can modify the signature of the `Serializer`:
   from 
   ```
   //before 4.0
   public interface Serializer<T> {
   
       byte[] serialize(String topic, T data);
   
       default byte[] serialize(String topic, Headers headers, T data) {
           return serialize(topic, data);
       }
   }
   ```
   to
   ```
   //since 4.0
   public interface Serializer<T> {
   
       ByteBuffer serialize(String topic, T data);
   
       ByteBuffer serialize(String topic, Headers headers, T data);
   }
   ```
   
   Then we could announce that we are modified the signature of the `Serializer` for existing users.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1192944264


##########
clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java:
##########
@@ -31,13 +34,27 @@ public interface Partitioner extends Configurable, Closeable {
      *
      * @param topic The topic name
      * @param key The key to partition on (or null if no key)
-     * @param keyBytes The serialized key to partition on( or null if no key)
+     * @param keyBytes The serialized key to partition on(or null if no key)

Review Comment:
   I have formatted the documentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1249416231


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {

Review Comment:
   I added MurMurHashBenchmark: 
   ```
   Benchmark                              (bytes)  (direct)  (readonly)   Mode  Cnt      Score     Error   Units
   MurMurHashBenchmark.byteArrayMurmur2       128     false       false  thrpt   10  23899.296 ± 286.754  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128     false        true  thrpt   10  23967.434 ± 175.354  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128      true       false  thrpt   10  23809.852 ± 285.063  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       128      true        true  thrpt   10  23897.052 ± 270.672  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256     false       false  thrpt   10  12312.813 ±  88.589  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256     false        true  thrpt   10  12321.595 ±  90.111  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256      true       false  thrpt   10  12305.468 ±  79.264  ops/ms
   MurMurHashBenchmark.byteArrayMurmur2       256      true        true  thrpt   10  12324.379 ±  83.018  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128     false       false  thrpt   10  20663.903 ± 149.945  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128     false        true  thrpt   10  20586.696 ± 283.857  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128      true       false  thrpt   10  26596.222 ± 638.404  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      128      true        true  thrpt   10  26719.416 ± 405.441  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256     false       false  thrpt   10  11270.697 ±  89.376  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256     false        true  thrpt   10  11015.074 ± 103.103  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256      true       false  thrpt   10  14733.461 ± 116.067  ops/ms
   MurMurHashBenchmark.byteBufferMurmur2      256      true        true  thrpt   10  14733.619 ± 250.710  ops/ms
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1251009330


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {

Review Comment:
   I believe that the performance impact of this part will only slightly diminish the optimizations brought by this PR, rather than completely offsetting them. This is because Utils#murmur2 is only applied to calculate the partition to which a key belongs, and the key is often much smaller than the value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250937252


##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+    @Param({"128", "256"})
+    private int bytes;
+
+    @Param({"false", "true"})
+    private boolean direct;
+
+    @Param({"false", "true"})
+    private boolean readonly;
+
+    private ByteBuffer byteBuffer;
+
+    private byte[] bytesArray;
+
+    @Setup
+    public void setup() {
+        final SplittableRandom random = new SplittableRandom();
+        bytesArray = new byte[bytes];
+        byteBuffer = direct ? ByteBuffer.allocateDirect(bytes) : ByteBuffer.allocate(bytes);
+        for (int i = 0; i < bytes; i++) {
+            final byte b = (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE + 1);
+            byteBuffer.put(i, b);
+            bytesArray[i] = b;
+        }
+
+        if (readonly) {
+            byteBuffer = byteBuffer.asReadOnlyBuffer();
+        }
+    }
+
+    @Benchmark
+    public int byteBufferMurmur2() {
+        return Utils.murmur2(byteBuffer);

Review Comment:
   I believe that the approach of returning the result is correct: 
   https://github.com/openjdk/jmh/blob/47f651b72d05c2c335f8ced5ed33f2fb0dd26720/jmh-samples/src/main/java/org/openjdk/jmh/samples/JMHSample_08_DeadCode.java#L79



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250939186


##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+    @Param({"128", "256"})
+    private int bytes;
+
+    @Param({"false", "true"})
+    private boolean direct;
+
+    @Param({"false", "true"})
+    private boolean readonly;
+
+    private ByteBuffer byteBuffer;
+
+    private byte[] bytesArray;
+
+    @Setup
+    public void setup() {
+        final SplittableRandom random = new SplittableRandom();

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250940246


##########
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java:
##########
@@ -407,19 +411,66 @@ private Serde<String> getStringSerde(String encoder) {
     }
 
     @Test
-    public void testByteBufferSerializer() {
+    public void testByteBufferSerCompatibility() {
         final byte[] bytes = "Hello".getBytes(UTF_8);
         final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes);
         final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes);
         final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+        final ByteBuffer heapBuffer3 = heapBuffer0.duplicate();
+        final ByteBuffer heapBuffer4 = heapBuffer1.duplicate();
+        final ByteBuffer heapBuffer5 = heapBuffer2.duplicate();
+        final ByteBuffer heapBuffer6 = heapBuffer0.asReadOnlyBuffer();
+        final ByteBuffer heapBuffer7 = heapBuffer1.asReadOnlyBuffer();
+        final ByteBuffer heapBuffer8 = heapBuffer2.asReadOnlyBuffer();
         final ByteBuffer directBuffer0 = ByteBuffer.allocateDirect(bytes.length + 1).put(bytes);
         final ByteBuffer directBuffer1 = ByteBuffer.allocateDirect(bytes.length).put(bytes);
         try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
-            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0));
-            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1));
-            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2));
-            assertArrayEquals(bytes, serializer.serialize(topic, directBuffer0));
-            assertArrayEquals(bytes, serializer.serialize(topic, directBuffer1));
+            assertNull(serializer.serialize(topic, null));
+            assertNull(serializer.serializeToByteBuffer(topic, null));
+            assertArrayEquals(new byte[0], serializer.serialize(topic, ByteBuffer.allocate(0)));
+
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer0);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer1);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer2);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer3);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer4);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer5);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer6);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer7);
+            testByteBufferSerCompatibility0(serializer, bytes, heapBuffer8);
+
+            testByteBufferSerCompatibility0(serializer, bytes, directBuffer0);
+            testByteBufferSerCompatibility0(serializer, bytes, directBuffer1);
+        }
+    }
+
+    private void testByteBufferSerCompatibility0(ByteBufferSerializer serializer,
+                                                 byte[] expectedBytes,
+                                                 ByteBuffer buffer) {
+        final ByteBuffer duplicatedBuf0 = buffer.duplicate();
+        final ByteBuffer duplicatedBuf1 = buffer.duplicate();
+        assertEquals(duplicatedBuf0, duplicatedBuf1);

Review Comment:
   OK, I will delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1264409803


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
         return h;
     }
 
+    /**
+     * Generates 32 bit murmur2 hash from ByteBuffer
+     * @param data ByteBuffer to hash
+     * @return 32 bit hash of the given ByteBuffer
+     */
+    @SuppressWarnings("fallthrough")
+    public static int murmur2(ByteBuffer data) {
+        final int length = data.remaining();
+        final int seed = 0x9747b28c;
+        // 'm' and 'r' are mixing constants generated offline.
+        // They're not really 'magic', they just happen to work well.
+        final int m = 0x5bd1e995;
+        final int r = 24;
+
+        // Initialize the hash to a random value
+        int h = seed ^ length;
+        final int length4 = length / 4;
+        data = data.order() == LITTLE_ENDIAN ? data : data.slice().order(LITTLE_ENDIAN);

Review Comment:
   > I don't think it's necessary to do that
   
   can you help me understand how this works? I am concerned about the fact that murmur2 requires little endian order but incoming data could be in big endian format. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org