You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/25 15:55:36 UTC
[GitHub] [kafka] LinShunKang opened a new pull request, #12685: KIP-872: Add Serializer#serializeToByteBuffer() to reduce memory copying
LinShunKang opened a new pull request, #12685:
URL: https://github.com/apache/kafka/pull/12685
Implementation of [KIP-872](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828)
### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250526734
##########
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java:
##########
@@ -407,19 +411,66 @@ private Serde<String> getStringSerde(String encoder) {
}
@Test
- public void testByteBufferSerializer() {
+ public void testByteBufferSerCompatibility() {
final byte[] bytes = "Hello".getBytes(UTF_8);
final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes);
final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes);
final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+ final ByteBuffer heapBuffer3 = heapBuffer0.duplicate();
Review Comment:
> Source ByteBuffer has an underlying array but it is shared with other byte buffers (e.g. this source bytebuffer is carved out using slice)
This test case is not covered here.
You can use something like:
```
val garbageBytes = "garbage bytes".getBytes(Charset.defaultCharset())
val extraBytes = "extra bytes".getBytes(Charset.defaultCharset())
val bigReadBuffer = ByteBuffer.allocate(channel.size().toInt + garbageBytes.length + extraBytes.length)
bigReadBuffer.put(extraBytes)
val readBuffer = bigReadBuffer.slice()
serializer.serialize(topic, readBuffer)
```
The difference with using duplicate() is that duplicate() retains the original position wrt to the underlying array. Using duplicate() won't catch the bugs where some code makes an incorrect assumption that underlying array offset 0 is the same as pos=0. In duplicate(), it is true but for slice() it may not be true.
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+ @Param({"128", "256"})
+ private int bytes;
+
+ @Param({"false", "true"})
+ private boolean direct;
+
+ @Param({"false", "true"})
+ private boolean readonly;
+
+ private ByteBuffer byteBuffer;
+
+ private byte[] bytesArray;
+
+ @Setup
+ public void setup() {
+ final SplittableRandom random = new SplittableRandom();
Review Comment:
nit
To get consistent results across benchmark runs, it is preferred to seed the random number with a constant value.
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+ @Param({"128", "256"})
+ private int bytes;
+
+ @Param({"false", "true"})
+ private boolean direct;
+
+ @Param({"false", "true"})
+ private boolean readonly;
+
+ private ByteBuffer byteBuffer;
+
+ private byte[] bytesArray;
+
+ @Setup
+ public void setup() {
+ final SplittableRandom random = new SplittableRandom();
+ bytesArray = new byte[bytes];
+ byteBuffer = direct ? ByteBuffer.allocateDirect(bytes) : ByteBuffer.allocate(bytes);
+ for (int i = 0; i < bytes; i++) {
+ final byte b = (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE + 1);
+ byteBuffer.put(i, b);
+ bytesArray[i] = b;
+ }
+
+ if (readonly) {
+ byteBuffer = byteBuffer.asReadOnlyBuffer();
+ }
+ }
+
+ @Benchmark
+ public int byteBufferMurmur2() {
+ return Utils.murmur2(byteBuffer);
Review Comment:
It is preferred to use blackhole [1] to consume the returned value. If we don't, then we risk the probability of compiler optimising this out thinking that the result is never used.
[1] https://github.com/openjdk/jmh/blob/master/jmh-samples/src/main/java/org/openjdk/jmh/samples/JMHSample_09_Blackholes.java
##########
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java:
##########
@@ -407,19 +411,66 @@ private Serde<String> getStringSerde(String encoder) {
}
@Test
- public void testByteBufferSerializer() {
+ public void testByteBufferSerCompatibility() {
final byte[] bytes = "Hello".getBytes(UTF_8);
final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes);
final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes);
final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+ final ByteBuffer heapBuffer3 = heapBuffer0.duplicate();
+ final ByteBuffer heapBuffer4 = heapBuffer1.duplicate();
+ final ByteBuffer heapBuffer5 = heapBuffer2.duplicate();
+ final ByteBuffer heapBuffer6 = heapBuffer0.asReadOnlyBuffer();
+ final ByteBuffer heapBuffer7 = heapBuffer1.asReadOnlyBuffer();
+ final ByteBuffer heapBuffer8 = heapBuffer2.asReadOnlyBuffer();
final ByteBuffer directBuffer0 = ByteBuffer.allocateDirect(bytes.length + 1).put(bytes);
final ByteBuffer directBuffer1 = ByteBuffer.allocateDirect(bytes.length).put(bytes);
try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
- assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0));
- assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1));
- assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2));
- assertArrayEquals(bytes, serializer.serialize(topic, directBuffer0));
- assertArrayEquals(bytes, serializer.serialize(topic, directBuffer1));
+ assertNull(serializer.serialize(topic, null));
+ assertNull(serializer.serializeToByteBuffer(topic, null));
+ assertArrayEquals(new byte[0], serializer.serialize(topic, ByteBuffer.allocate(0)));
+
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer0);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer1);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer2);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer3);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer4);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer5);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer6);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer7);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer8);
+
+ testByteBufferSerCompatibility0(serializer, bytes, directBuffer0);
+ testByteBufferSerCompatibility0(serializer, bytes, directBuffer1);
+ }
+ }
+
+ private void testByteBufferSerCompatibility0(ByteBufferSerializer serializer,
+ byte[] expectedBytes,
+ ByteBuffer buffer) {
+ final ByteBuffer duplicatedBuf0 = buffer.duplicate();
+ final ByteBuffer duplicatedBuf1 = buffer.duplicate();
+ assertEquals(duplicatedBuf0, duplicatedBuf1);
Review Comment:
Isn't this assertion unnecessary? We just duplicated the buffer in the above two lines. With this assertion we are actually testing whether duplicate() works correct or not! We don't need to test that.
##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
return h;
}
+ /**
+ * Generates 32 bit murmur2 hash from ByteBuffer
+ * @param data ByteBuffer to hash
+ * @return 32 bit hash of the given ByteBuffer
+ */
+ @SuppressWarnings("fallthrough")
+ public static int murmur2(ByteBuffer data) {
+ final int length = data.remaining();
+ final int seed = 0x9747b28c;
+ // 'm' and 'r' are mixing constants generated offline.
+ // They're not really 'magic', they just happen to work well.
+ final int m = 0x5bd1e995;
+ final int r = 24;
+
+ // Initialize the hash to a random value
+ int h = seed ^ length;
+ final int length4 = length / 4;
+ data = data.order() == LITTLE_ENDIAN ? data : data.slice().order(LITTLE_ENDIAN);
Review Comment:
I understand that you are doing this because murmur2 works on little endian order. Please add a comment explaining it here.
However (and correct me if I am wrong), this line says, "if the current buffer does not interpret the underlying ordering of data as little endian, then create a new buffer which will interpret the underlying data as little endian". This logic seems incorrect to be because what if the underlying data is in big endian format. Using a bytebuffer to forcefully view it in little endian won't change the underlying data.
Alternatively, let's do two things:
1. If the incoming ByteBuffer in Serializer is not in little endian, then we need to convert the underlying data into little endian (and yes that will be a data copy).
2. Remove this statement from here and instead replace with a check to throw an exception if it receives a bytebuffer configured as little endian.
##########
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java:
##########
@@ -407,19 +411,66 @@ private Serde<String> getStringSerde(String encoder) {
}
@Test
- public void testByteBufferSerializer() {
+ public void testByteBufferSerCompatibility() {
final byte[] bytes = "Hello".getBytes(UTF_8);
final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes);
final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes);
final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+ final ByteBuffer heapBuffer3 = heapBuffer0.duplicate();
+ final ByteBuffer heapBuffer4 = heapBuffer1.duplicate();
Review Comment:
We should use `@Parameterized` here for all these different parameters that we are testing.
##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
return h;
}
+ /**
+ * Generates 32 bit murmur2 hash from ByteBuffer
+ * @param data ByteBuffer to hash
+ * @return 32 bit hash of the given ByteBuffer
+ */
+ @SuppressWarnings("fallthrough")
+ public static int murmur2(ByteBuffer data) {
Review Comment:
Seems like we have a 13% regression for non-direct buffer cases and similar improvement in direct buffer cases. The additional computation we are performing for non-direct buffer is `data.arrayOffset() + data.position()` which is done once (hopefully, compiler is intelligent enough to that). Could we perhaps take it out of the loop just to be double sure.
Any other thoughts on why do we have a perf difference between direct vs non-direct buffer here? I expect a performance difference between them when GC is involved but in this case we are not creating more objects (except when calling slice()).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250943138
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -321,13 +322,20 @@ public int partition() {
}
}
- /*
+ /**
* Default hashing function to choose a partition from the serialized key bytes
*/
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
+ /**
+ * Default hashing function to choose a partition from the serialized key bytes
+ */
+ public static int partitionForKey(final ByteBuffer serializedKey, final int numPartitions) {
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
Review Comment:
Got it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250475769
##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
data.flip();
return Utils.toArray(data);
}
+
+ /**
+ * Note that this method will modify the position and limit of the input ByteBuffer.
+ *
+ * @param topic topic associated with data
+ * @param data typed data
+ * @return serialized ByteBuffer
+ */
+ @Override
+ public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {
Review Comment:
ah - my bad, I meant to say limit = capacity
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1257448258
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+ @Param({"128", "256"})
+ private int bytes;
+
+ @Param({"false", "true"})
+ private boolean direct;
+
+ @Param({"false", "true"})
+ private boolean readonly;
+
+ private ByteBuffer byteBuffer;
+
+ private byte[] bytesArray;
+
+ @Setup
+ public void setup() {
+ final SplittableRandom random = new SplittableRandom();
Review Comment:
After specifying a fixed seed for `SplittableRandom`:
```
Benchmark (bytes) (direct) (readonly) (seed) Mode Cnt Score Error Units
MurMurHashBenchmark.byteArrayMurmur2 128 false false 42 thrpt 10 23981.666 ± 121.983 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 false true 42 thrpt 10 23922.919 ± 154.278 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 true false 42 thrpt 10 23953.261 ± 141.848 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 true true 42 thrpt 10 24006.351 ± 111.603 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 false false 42 thrpt 10 12331.088 ± 71.559 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 false true 42 thrpt 10 12314.595 ± 59.174 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 true false 42 thrpt 10 12308.788 ± 76.674 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 true true 42 thrpt 10 12310.486 ± 73.764 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 false false 42 thrpt 10 20567.501 ± 152.677 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 false true 42 thrpt 10 20647.973 ± 155.602 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 true false 42 thrpt 10 26602.151 ± 478.886 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 true true 42 thrpt 10 26974.506 ± 227.064 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 false false 42 thrpt 10 11296.852 ± 50.338 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 false true 42 thrpt 10 11013.108 ± 103.543 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 true false 42 thrpt 10 14734.871 ± 156.346 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 true true 42 thrpt 10 14710.254 ± 182.270 ops/ms
```
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+ @Param({"128", "256"})
+ private int bytes;
+
+ @Param({"false", "true"})
+ private boolean direct;
+
+ @Param({"false", "true"})
+ private boolean readonly;
+
+ private ByteBuffer byteBuffer;
+
+ private byte[] bytesArray;
+
+ @Setup
+ public void setup() {
+ final SplittableRandom random = new SplittableRandom();
Review Comment:
After specifying a fixed seed for `SplittableRandom`:
```
Benchmark (bytes) (direct) (readonly) (seed) Mode Cnt Score Error Units
MurMurHashBenchmark.byteArrayMurmur2 128 false false 42 thrpt 10 23981.666 ± 121.983 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 false true 42 thrpt 10 23922.919 ± 154.278 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 true false 42 thrpt 10 23953.261 ± 141.848 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 true true 42 thrpt 10 24006.351 ± 111.603 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 false false 42 thrpt 10 12331.088 ± 71.559 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 false true 42 thrpt 10 12314.595 ± 59.174 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 true false 42 thrpt 10 12308.788 ± 76.674 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 true true 42 thrpt 10 12310.486 ± 73.764 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 false false 42 thrpt 10 20567.501 ± 152.677 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 false true 42 thrpt 10 20647.973 ± 155.602 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 true false 42 thrpt 10 26602.151 ± 478.886 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 true true 42 thrpt 10 26974.506 ± 227.064 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 false false 42 thrpt 10 11296.852 ± 50.338 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 false true 42 thrpt 10 11013.108 ± 103.543 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 true false 42 thrpt 10 14734.871 ± 156.346 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 true true 42 thrpt 10 14710.254 ± 182.270 ops/ms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1602048965
Hi, @divijvaidya @showuon @dengziming @vvcephei
KIP-872 has PASSED, I've submitted my code implementation for KIP-872. It would be great if you could spare some time to review it. Your insights and suggestions are valuable. Thanks in advance!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1264484155
##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
return h;
}
+ /**
+ * Generates 32 bit murmur2 hash from ByteBuffer
+ * @param data ByteBuffer to hash
+ * @return 32 bit hash of the given ByteBuffer
+ */
+ @SuppressWarnings("fallthrough")
+ public static int murmur2(ByteBuffer data) {
+ final int length = data.remaining();
+ final int seed = 0x9747b28c;
+ // 'm' and 'r' are mixing constants generated offline.
+ // They're not really 'magic', they just happen to work well.
+ final int m = 0x5bd1e995;
+ final int r = 24;
+
+ // Initialize the hash to a random value
+ int h = seed ^ length;
+ final int length4 = length / 4;
+ data = data.order() == LITTLE_ENDIAN ? data : data.slice().order(LITTLE_ENDIAN);
Review Comment:
> > I don't think it's necessary to do that
>
> can you help me understand how this works? I am concerned about the fact that murmur2 requires little endian order but incoming data could be in big endian format.
We can approach the problem in the following way: as long as the value of variable `k` obtained in both `Utils#murmur2(ByteBuffer)` and `Utils#murmur2(byte[])` is consistent, we can ensure that the results calculated by `Utils#murmur2(ByteBuffer)` and `Utils#murmur2(byte[])` are the same.
Next, I will use a simple example to explain why it doesn't affect the correctness of `Utils#murmur2(ByteBuffer)`whether the `ByteBuffer` passed by the user is `BIG_ENDIAN` or `LITTLE_ENDIAN` order.
```
final ByteBuffer heapBuffer = ByteBuffer.allocate(4).order(BIG_ENDIAN);
heapBuffer.putInt(1234567890); // [73, -106, 2, -46]
heapBuffer.flip();
// [73, -106, 2, -46]
final byte[] bytes = Utils.toArray(heapBuffer);
// obtain variable k from bytes just like Utils#murmur2(byte[])
final int k0 = (bytes[0] & 0xff) + ((bytes[1] & 0xff) << 8) + ((bytes[2] & 0xff) << 16) + ((bytes[3] & 0xff) << 24);
// obtain variable k from bytes just like Utils#murmur2(ByteBuffer)
final int k1 = heapBuffer.slice().order(LITTLE_ENDIAN).getInt(0);
// It will print `k0=-771582391, k1=-771582391`
System.out.println("k0=" + k0 + ", k1=" + k1);
```
NOTE: `Utils#murmur2(ByteBuffer)` does not require the `ByteBuffer` passed by the user to be in `LITTLE_ENDIAN` order. `Utils#murmur2(ByteBuffer)` simply reads the data from the `ByteBuffer` in `LITTLE_ENDIAN` order.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying [kafka]
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1925307450
> @LinShunKang It looks like you need to rebase this PR to resolve conflicts. Ping @divijvaidya @showuon @dengziming, can you help get this merged? Thanks
@mimaison I would like to resolve conflicts, but due to the presence of [#14617](https://github.com/apache/kafka/pull/14617), I don't know how to implement `ByteBufferSerializer#serializeToByteBuffer(String, ByteBuffer)`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1191281659
##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1383,9 +1386,13 @@ private ClusterResourceListeners configureClusterResourceListeners(Serializer<K>
* can be used (the partition is then calculated by built-in
* partitioning logic).
*/
- private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Review Comment:
Lest my eyes deceive me, this is just formatting, correct?
##########
clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java:
##########
@@ -31,13 +34,27 @@ public interface Partitioner extends Configurable, Closeable {
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
- * @param keyBytes The serialized key to partition on( or null if no key)
+ * @param keyBytes The serialized key to partition on(or null if no key)
Review Comment:
nit: Can we change:
`The serialized key to partition on(or null if no key)`
to
`The serialized key to partition on (or null if no key)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1192944064
##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1383,9 +1386,13 @@ private ClusterResourceListeners configureClusterResourceListeners(Serializer<K>
* can be used (the partition is then calculated by built-in
* partitioning logic).
*/
- private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Review Comment:
Yes, I have already rolled back this change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250942671
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+ @Param({"128", "256"})
+ private int bytes;
+
+ @Param({"false", "true"})
+ private boolean direct;
+
+ @Param({"false", "true"})
+ private boolean readonly;
+
+ private ByteBuffer byteBuffer;
+
+ private byte[] bytesArray;
+
+ @Setup
+ public void setup() {
+ final SplittableRandom random = new SplittableRandom();
+ bytesArray = new byte[bytes];
+ byteBuffer = direct ? ByteBuffer.allocateDirect(bytes) : ByteBuffer.allocate(bytes);
+ for (int i = 0; i < bytes; i++) {
+ final byte b = (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE + 1);
+ byteBuffer.put(i, b);
+ bytesArray[i] = b;
+ }
+
+ if (readonly) {
+ byteBuffer = byteBuffer.asReadOnlyBuffer();
+ }
+ }
+
+ @Benchmark
+ public int byteBufferMurmur2() {
+ return Utils.murmur2(byteBuffer);
Review Comment:
ah, thanks! I learnt something new today.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1616054098
> Add @Deprecated annotation for Utils#murmur2(byte[])
We should anyways start using murmur3 starting Kafka 4.x. Also perhaps the endia-ness neutral version of it otherwise we risk having different architectures/clients sending partitions differently. If you are interested, a KIP would be required to move to murmur3 and with that KIP we can mark murmur2 completely deprecated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1249235892
##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
data.flip();
return Utils.toArray(data);
}
+
+ /**
+ * Note that this method will modify the position and limit of the input ByteBuffer.
+ *
+ * @param topic topic associated with data
+ * @param data typed data
+ * @return serialized ByteBuffer
+ */
+ @Override
+ public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {
Review Comment:
I have a little confusion:
2. In ByteBuffer `remaining = limit - position`, if `position` > 0 then `remaining` must lower than `limit`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250943986
##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
data.flip();
return Utils.toArray(data);
}
+
+ /**
+ * Note that this method will modify the position and limit of the input ByteBuffer.
+ *
+ * @param topic topic associated with data
+ * @param data typed data
+ * @return serialized ByteBuffer
+ */
+ @Override
+ public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {
Review Comment:
No, actually number 1 is still missing. See my comment at https://github.com/apache/kafka/pull/12685#discussion_r1250526734
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1257492943
##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
return h;
}
+ /**
+ * Generates 32 bit murmur2 hash from ByteBuffer
+ * @param data ByteBuffer to hash
+ * @return 32 bit hash of the given ByteBuffer
+ */
+ @SuppressWarnings("fallthrough")
+ public static int murmur2(ByteBuffer data) {
+ final int length = data.remaining();
+ final int seed = 0x9747b28c;
+ // 'm' and 'r' are mixing constants generated offline.
+ // They're not really 'magic', they just happen to work well.
+ final int m = 0x5bd1e995;
+ final int r = 24;
+
+ // Initialize the hash to a random value
+ int h = seed ^ length;
+ final int length4 = length / 4;
+ data = data.order() == LITTLE_ENDIAN ? data : data.slice().order(LITTLE_ENDIAN);
Review Comment:
I don't think it's necessary to do that. From this [commit](https://github.com/apache/kafka/pull/12685/commits/721db38b0580250a122f7dd178172ddec616cbe6), it seems that this change does not have any compatibility issues.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1271319659
##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
return h;
}
+ /**
+ * Generates 32 bit murmur2 hash from ByteBuffer
+ * @param data ByteBuffer to hash
+ * @return 32 bit hash of the given ByteBuffer
+ */
+ @SuppressWarnings("fallthrough")
+ public static int murmur2(ByteBuffer data) {
Review Comment:
@divijvaidya
I optimized `Utils#murmur2(ByteBuffer)` through this [commit](https://github.com/apache/kafka/pull/12685/commits/3dec26dadf60c43e53ff8b13ef83206aabf82637), and here are the results:
```
Benchmark (bytes) (direct) (littleEndian) (seed) Mode Cnt Score Error Units
MurMurHashBenchmark.byteArrayMurmur2 128 false false 42 thrpt 10 23952.036 ± 234.049 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 false true 42 thrpt 10 24006.164 ± 153.647 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 true false 42 thrpt 10 23997.223 ± 155.545 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 true true 42 thrpt 10 24014.765 ± 169.769 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 false false 42 thrpt 10 12314.915 ± 71.558 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 false true 42 thrpt 10 12312.495 ± 87.004 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 true false 42 thrpt 10 12299.405 ± 85.441 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 true true 42 thrpt 10 12289.485 ± 107.814 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 false false 42 thrpt 10 23436.309 ± 169.624 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 false true 42 thrpt 10 23436.878 ± 93.588 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 true false 42 thrpt 10 25995.246 ± 582.841 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 true true 42 thrpt 10 28573.727 ± 155.530 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 false false 42 thrpt 10 12210.036 ± 69.601 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 false true 42 thrpt 10 12184.773 ± 98.403 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 true false 42 thrpt 10 14500.743 ± 196.826 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 true true 42 thrpt 10 15421.734 ± 113.717 ops/ms
```
After the optimization, the throughput of `Utils#murmur2(ByteBuffer)` in `HeapByteBuffer` **decreased** by only **2%** compared to `Utils#murmur2(byte[])`, while in `DirectByteBuffer`, the throughput **increased** by **8%**.
##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
return h;
}
+ /**
+ * Generates 32 bit murmur2 hash from ByteBuffer
+ * @param data ByteBuffer to hash
+ * @return 32 bit hash of the given ByteBuffer
+ */
+ @SuppressWarnings("fallthrough")
+ public static int murmur2(ByteBuffer data) {
Review Comment:
@divijvaidya
I optimized `Utils#murmur2(ByteBuffer)` through this [commit](https://github.com/apache/kafka/pull/12685/commits/3dec26dadf60c43e53ff8b13ef83206aabf82637), and here are the results:
```
Benchmark (bytes) (direct) (littleEndian) (seed) Mode Cnt Score Error Units
MurMurHashBenchmark.byteArrayMurmur2 128 false false 42 thrpt 10 23952.036 ± 234.049 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 false true 42 thrpt 10 24006.164 ± 153.647 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 true false 42 thrpt 10 23997.223 ± 155.545 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 true true 42 thrpt 10 24014.765 ± 169.769 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 false false 42 thrpt 10 12314.915 ± 71.558 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 false true 42 thrpt 10 12312.495 ± 87.004 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 true false 42 thrpt 10 12299.405 ± 85.441 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 true true 42 thrpt 10 12289.485 ± 107.814 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 false false 42 thrpt 10 23436.309 ± 169.624 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 false true 42 thrpt 10 23436.878 ± 93.588 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 true false 42 thrpt 10 25995.246 ± 582.841 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 true true 42 thrpt 10 28573.727 ± 155.530 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 false false 42 thrpt 10 12210.036 ± 69.601 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 false true 42 thrpt 10 12184.773 ± 98.403 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 true false 42 thrpt 10 14500.743 ± 196.826 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 true true 42 thrpt 10 15421.734 ± 113.717 ops/ms
```
After the optimization, the throughput of `Utils#murmur2(ByteBuffer)` in `HeapByteBuffer` **decreased** by only **2%** compared to `Utils#murmur2(byte[])`, while in `DirectByteBuffer`, the throughput **increased** by **8%**.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1616603443
> > Add @deprecated annotation for Utils#murmur2(byte[])
>
> We should anyways start using murmur3 starting Kafka 4.x. Also perhaps the endia-ness neutral version of it otherwise we risk having different architectures/clients sending partitions differently. If you are interested, a KIP would be required to move to murmur3 and with that KIP we can mark murmur2 completely deprecated.
1. When I add the @Deprecated annotation to Utils#murmur2(byte[]), the test cases will result in an error when executed.
2. I am interested in switching to murmur3, but the decision of whether to use a byte-order-independent algorithm may require further discussion. Considering that there are Kafka clients in languages other than Java, we need an algorithm that is independent of language and computer architecture.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250947571
##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
return h;
}
+ /**
+ * Generates 32 bit murmur2 hash from ByteBuffer
+ * @param data ByteBuffer to hash
+ * @return 32 bit hash of the given ByteBuffer
+ */
+ @SuppressWarnings("fallthrough")
+ public static int murmur2(ByteBuffer data) {
Review Comment:
That's fair. Prior to this change byte[] would also have been stored contiguously hence reading 4 bytes would have been faster whereas HeapByteBuffer may perform random seeks to read the entire int.
It's fine since I don't see how we can potentially avoid this but note that this may offset gains that we are getting by removing the data copy in this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1257483057
##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
data.flip();
return Utils.toArray(data);
}
+
+ /**
+ * Note that this method will modify the position and limit of the input ByteBuffer.
+ *
+ * @param topic topic associated with data
+ * @param data typed data
+ * @return serialized ByteBuffer
+ */
+ @Override
+ public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {
Review Comment:
> No, actually number 1 is still missing. See my comment at [#12685 (comment)](https://github.com/apache/kafka/pull/12685#discussion_r1250526734)
The number 1 case has been covered: https://github.com/apache/kafka/pull/12685/commits/85d98090bf5433bc54f91c573dbc3f4a48d77ebc
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1632034547
Hello @showuon @dengziming @vvcephei , I would like to kindly request your code review for my recent contribution related to KIP-872. Your expertise and feedback would be highly appreciated. Thank you!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250933347
##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
return h;
}
+ /**
+ * Generates 32 bit murmur2 hash from ByteBuffer
+ * @param data ByteBuffer to hash
+ * @return 32 bit hash of the given ByteBuffer
+ */
+ @SuppressWarnings("fallthrough")
+ public static int murmur2(ByteBuffer data) {
Review Comment:
> Seems like we have a 13% regression for non-direct buffer cases and similar improvement in direct buffer cases. The additional computation we are performing for non-direct buffer is `data.arrayOffset() + data.position()` which is done once (hopefully, compiler is intelligent enough to that). Could we perhaps take it out of the loop just to be double sure.
>
> Any other thoughts on why do we have a perf difference between direct vs non-direct buffer here? I expect a performance difference between them when GC is involved but in this case we are not creating more objects (except when calling slice()).
I believe that `data.arrayOffset() + data.position()` is not the key issue, but rather `ByteBuffer#getInt(int)`:
* In aarch64/x86_64/x86/ppc64le/ppc64/i386 architectures, `DirectByteBuffer` uses `Unsafe#getInt(int)` to retrieve 4 bytes in one operation.
* No matter what architectures, `HeapByteBuffer` uses `ByteBuffer#_get(int)` to retrieve 4 bytes in four operations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1249312027
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -321,13 +322,20 @@ public int partition() {
}
}
- /*
+ /**
* Default hashing function to choose a partition from the serialized key bytes
*/
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
+ /**
+ * Default hashing function to choose a partition from the serialized key bytes
+ */
+ public static int partitionForKey(final ByteBuffer serializedKey, final int numPartitions) {
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
Review Comment:
`Utils#murmur2(ByteBuffer)` does not change the offsets of the input `ByteBuffer` because `Utils#murmur2(ByteBuffer)` only uses the `ByteBuffer#get(int)` and `ByteBuffer#getInt(int)` methods to read data from the `ByteBuffer`, and these methods do not change the offsets of the `ByteBuffer`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1249416755
##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
data.flip();
return Utils.toArray(data);
}
+
+ /**
+ * Note that this method will modify the position and limit of the input ByteBuffer.
+ *
+ * @param topic topic associated with data
+ * @param data typed data
+ * @return serialized ByteBuffer
+ */
+ @Override
+ public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {
Review Comment:
All test cases have been added except for the second one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1607609145
@LinShunKang Give me a few days. I will get to it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1257491896
##########
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java:
##########
@@ -407,19 +411,66 @@ private Serde<String> getStringSerde(String encoder) {
}
@Test
- public void testByteBufferSerializer() {
+ public void testByteBufferSerCompatibility() {
final byte[] bytes = "Hello".getBytes(UTF_8);
final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes);
final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes);
final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+ final ByteBuffer heapBuffer3 = heapBuffer0.duplicate();
+ final ByteBuffer heapBuffer4 = heapBuffer1.duplicate();
Review Comment:
OK, done, see https://github.com/apache/kafka/commit/721db38b0580250a122f7dd178172ddec616cbe6
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying [kafka]
Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1862895717
@divijvaidya @showuon @dengziming You voted on the KIP ([KIP-872](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=228495828)), can you help this PR progress? It has been opened for over a year! Thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1546596937
> LGTM overall. I wish the murmur code was somehow generalized so that there wasn't two versions of it. We'll just have to be careful that it doesn't diverge.
Thank you for your reviewing!
Utils#murmur2(byte[]) and Utils#murmur2(ByteBuffer) are only used for Partitioners, so we can:
1. Modify all Partitioners to use Utils#murmur2(ByteBuffer)
2. Add @Deprecated annotation for Utils#murmur2(byte[])
3. Invoke Utils#murmur2(ByteBuffer.wrap(byte[])) in Utils#murmur2(byte[])
In order to accomplish the first step mentioned above, we should add a method called WindowedSerializer#serializeBaseKeyToByteBuffer(String topic, Windowed data) and utilize this method in the WindowedStreamPartitioner#partition() function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250918669
##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
data.flip();
return Utils.toArray(data);
}
+
+ /**
+ * Note that this method will modify the position and limit of the input ByteBuffer.
+ *
+ * @param topic topic associated with data
+ * @param data typed data
+ * @return serialized ByteBuffer
+ */
+ @Override
+ public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {
Review Comment:
> ah - my bad, I meant to say limit = capacity
OK, then all test cases have been covered.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1248924148
##########
clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java:
##########
@@ -50,4 +51,29 @@ public byte[] serialize(String topic, ByteBuffer data) {
data.flip();
return Utils.toArray(data);
}
+
+ /**
+ * Note that this method will modify the position and limit of the input ByteBuffer.
+ *
+ * @param topic topic associated with data
+ * @param data typed data
+ * @return serialized ByteBuffer
+ */
+ @Override
+ public ByteBuffer serializeToByteBuffer(String topic, ByteBuffer data) {
Review Comment:
Please add the following cases in the unit test to validate the same behaviour before and after this PR:
1. Source ByteBuffer has an underlying array but it is shared with other byte buffers (e.g. this source bytebuffer is carved out using slice)
2. Source ByteBuffer has a non-zero position and limit = remaining.
3. Source ByteBuffer has a non zero position and limit = position.
4. Source Bytebuffer is backed by a direct buffer.
5. Incoming Bytebuffer is a read-only buffer.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -321,13 +322,20 @@ public int partition() {
}
}
- /*
+ /**
* Default hashing function to choose a partition from the serialized key bytes
*/
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
+ /**
+ * Default hashing function to choose a partition from the serialized key bytes
+ */
+ public static int partitionForKey(final ByteBuffer serializedKey, final int numPartitions) {
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
Review Comment:
murmur2() will change the position of the bytebuffer and (correct me if I am wrong) we want to read it again to actually get the key (using remaining()). Wouldn't we end up reading the wrong key since position has already been changed by this method?
##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
return h;
}
+ /**
+ * Generates 32 bit murmur2 hash from ByteBuffer
+ * @param data ByteBuffer to hash
+ * @return 32 bit hash of the given ByteBuffer
+ */
+ @SuppressWarnings("fallthrough")
+ public static int murmur2(ByteBuffer data) {
Review Comment:
This is a performance sensitive code. Can we have a microbenchmark to ensure that we haven't regressed here? You can use https://github.com/sangupta/murmur/blob/master/src/test/java/com/sangupta/murmur/MurmurPerformanceTests.java
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying [kafka]
Posted by "mimaison (via GitHub)" <gi...@apache.org>.
mimaison commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1923787527
@LinShunKang It looks like you need to rebase this PR to resolve conflicts.
Ping @divijvaidya @showuon @dengziming, can you help get this merged? Thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying [kafka]
Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1926319143
@LinShunKang , sorry for being late. I had a quick look at https://github.com/apache/kafka/pull/14617, it looks like the `ByteBufferSerializer#serialize` is a public API and cannot be changed without KIP. You know more than I do, so, in your opinion, what should we do from now? Could we keep the `ByteBufferSerializer#serialize` and implement `ByteBufferSerializer#serializeToByteBuffer`? Do you think we should include people in this [PR](https://github.com/apache/kafka/pull/14617) to this PR for discussion? Or do you have any other thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying [kafka]
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1929526582
> @LinShunKang , sorry for being late. I had a quick look at #14617, it looks like the `ByteBufferSerializer#serialize` is a public API and cannot be changed without KIP. You know more than I do, so, in your opinion, what should we do from now? Could we not touch the `ByteBufferSerializer#serialize` and only implement `ByteBufferSerializer#serializeToByteBuffer`? Do you think we should include people in this [PR](https://github.com/apache/kafka/pull/14617) to this PR for discussion? Or do you have any other thoughts?
We could not only implement `ByteBufferSerializer#serializeToByteBuffer` because if `Serializer` implements this method, then the `Serializer#serialize` will never be called. And `ByteBufferSerializer#serialize` has obvious logical problems.
I believe we should address the logical issues in `ByteBufferSerializer#serialize`, but this will introduce breaking changes for existing users. And the `Serializer` should not have both `serialize` and `serializeToByteBuffer` methods at the same time. Therefore, I suggest we tackle these issues in Kafka 4.0, where we can modify the signature of the `Serializer`:
from
```
//before 4.0
public interface Serializer<T> {
byte[] serialize(String topic, T data);
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
}
```
to
```
//since 4.0
public interface Serializer<T> {
ByteBuffer serialize(String topic, T data);
ByteBuffer serialize(String topic, Headers headers, T data);
}
```
Then we could announce that we are modified the signature of the `Serializer` for existing users.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1192944264
##########
clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java:
##########
@@ -31,13 +34,27 @@ public interface Partitioner extends Configurable, Closeable {
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
- * @param keyBytes The serialized key to partition on( or null if no key)
+ * @param keyBytes The serialized key to partition on(or null if no key)
Review Comment:
I have formatted the documentation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1249416231
##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
return h;
}
+ /**
+ * Generates 32 bit murmur2 hash from ByteBuffer
+ * @param data ByteBuffer to hash
+ * @return 32 bit hash of the given ByteBuffer
+ */
+ @SuppressWarnings("fallthrough")
+ public static int murmur2(ByteBuffer data) {
Review Comment:
I added MurMurHashBenchmark:
```
Benchmark (bytes) (direct) (readonly) Mode Cnt Score Error Units
MurMurHashBenchmark.byteArrayMurmur2 128 false false thrpt 10 23899.296 ± 286.754 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 false true thrpt 10 23967.434 ± 175.354 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 true false thrpt 10 23809.852 ± 285.063 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 128 true true thrpt 10 23897.052 ± 270.672 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 false false thrpt 10 12312.813 ± 88.589 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 false true thrpt 10 12321.595 ± 90.111 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 true false thrpt 10 12305.468 ± 79.264 ops/ms
MurMurHashBenchmark.byteArrayMurmur2 256 true true thrpt 10 12324.379 ± 83.018 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 false false thrpt 10 20663.903 ± 149.945 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 false true thrpt 10 20586.696 ± 283.857 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 true false thrpt 10 26596.222 ± 638.404 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 128 true true thrpt 10 26719.416 ± 405.441 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 false false thrpt 10 11270.697 ± 89.376 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 false true thrpt 10 11015.074 ± 103.103 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 true false thrpt 10 14733.461 ± 116.067 ops/ms
MurMurHashBenchmark.byteBufferMurmur2 256 true true thrpt 10 14733.619 ± 250.710 ops/ms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1251009330
##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
return h;
}
+ /**
+ * Generates 32 bit murmur2 hash from ByteBuffer
+ * @param data ByteBuffer to hash
+ * @return 32 bit hash of the given ByteBuffer
+ */
+ @SuppressWarnings("fallthrough")
+ public static int murmur2(ByteBuffer data) {
Review Comment:
I believe that the performance impact of this part will only slightly diminish the optimizations brought by this PR, rather than completely offsetting them. This is because Utils#murmur2 is only applied to calculate the partition to which a key belongs, and the key is often much smaller than the value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250937252
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+ @Param({"128", "256"})
+ private int bytes;
+
+ @Param({"false", "true"})
+ private boolean direct;
+
+ @Param({"false", "true"})
+ private boolean readonly;
+
+ private ByteBuffer byteBuffer;
+
+ private byte[] bytesArray;
+
+ @Setup
+ public void setup() {
+ final SplittableRandom random = new SplittableRandom();
+ bytesArray = new byte[bytes];
+ byteBuffer = direct ? ByteBuffer.allocateDirect(bytes) : ByteBuffer.allocate(bytes);
+ for (int i = 0; i < bytes; i++) {
+ final byte b = (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE + 1);
+ byteBuffer.put(i, b);
+ bytesArray[i] = b;
+ }
+
+ if (readonly) {
+ byteBuffer = byteBuffer.asReadOnlyBuffer();
+ }
+ }
+
+ @Benchmark
+ public int byteBufferMurmur2() {
+ return Utils.murmur2(byteBuffer);
Review Comment:
I believe that the approach of returning the result is correct:
https://github.com/openjdk/jmh/blob/47f651b72d05c2c335f8ced5ed33f2fb0dd26720/jmh-samples/src/main/java/org/openjdk/jmh/samples/JMHSample_08_DeadCode.java#L79
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250939186
##########
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/MurMurHashBenchmark.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Utils;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.openjdk.jmh.annotations.Mode.Throughput;
+
+@Fork(2)
+@BenchmarkMode({Throughput})
+@OutputTimeUnit(MILLISECONDS)
+@State(Scope.Thread)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class MurMurHashBenchmark {
+
+ @Param({"128", "256"})
+ private int bytes;
+
+ @Param({"false", "true"})
+ private boolean direct;
+
+ @Param({"false", "true"})
+ private boolean readonly;
+
+ private ByteBuffer byteBuffer;
+
+ private byte[] bytesArray;
+
+ @Setup
+ public void setup() {
+ final SplittableRandom random = new SplittableRandom();
Review Comment:
OK
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] LinShunKang commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "LinShunKang (via GitHub)" <gi...@apache.org>.
LinShunKang commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1250940246
##########
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java:
##########
@@ -407,19 +411,66 @@ private Serde<String> getStringSerde(String encoder) {
}
@Test
- public void testByteBufferSerializer() {
+ public void testByteBufferSerCompatibility() {
final byte[] bytes = "Hello".getBytes(UTF_8);
final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes);
final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes);
final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+ final ByteBuffer heapBuffer3 = heapBuffer0.duplicate();
+ final ByteBuffer heapBuffer4 = heapBuffer1.duplicate();
+ final ByteBuffer heapBuffer5 = heapBuffer2.duplicate();
+ final ByteBuffer heapBuffer6 = heapBuffer0.asReadOnlyBuffer();
+ final ByteBuffer heapBuffer7 = heapBuffer1.asReadOnlyBuffer();
+ final ByteBuffer heapBuffer8 = heapBuffer2.asReadOnlyBuffer();
final ByteBuffer directBuffer0 = ByteBuffer.allocateDirect(bytes.length + 1).put(bytes);
final ByteBuffer directBuffer1 = ByteBuffer.allocateDirect(bytes.length).put(bytes);
try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
- assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0));
- assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1));
- assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2));
- assertArrayEquals(bytes, serializer.serialize(topic, directBuffer0));
- assertArrayEquals(bytes, serializer.serialize(topic, directBuffer1));
+ assertNull(serializer.serialize(topic, null));
+ assertNull(serializer.serializeToByteBuffer(topic, null));
+ assertArrayEquals(new byte[0], serializer.serialize(topic, ByteBuffer.allocate(0)));
+
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer0);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer1);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer2);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer3);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer4);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer5);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer6);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer7);
+ testByteBufferSerCompatibility0(serializer, bytes, heapBuffer8);
+
+ testByteBufferSerCompatibility0(serializer, bytes, directBuffer0);
+ testByteBufferSerCompatibility0(serializer, bytes, directBuffer1);
+ }
+ }
+
+ private void testByteBufferSerCompatibility0(ByteBufferSerializer serializer,
+ byte[] expectedBytes,
+ ByteBuffer buffer) {
+ final ByteBuffer duplicatedBuf0 = buffer.duplicate();
+ final ByteBuffer duplicatedBuf1 = buffer.duplicate();
+ assertEquals(duplicatedBuf0, duplicatedBuf1);
Review Comment:
OK, I will delete it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying
Posted by "divijvaidya (via GitHub)" <gi...@apache.org>.
divijvaidya commented on code in PR #12685:
URL: https://github.com/apache/kafka/pull/12685#discussion_r1264409803
##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -517,6 +519,50 @@ public static int murmur2(final byte[] data) {
return h;
}
+ /**
+ * Generates 32 bit murmur2 hash from ByteBuffer
+ * @param data ByteBuffer to hash
+ * @return 32 bit hash of the given ByteBuffer
+ */
+ @SuppressWarnings("fallthrough")
+ public static int murmur2(ByteBuffer data) {
+ final int length = data.remaining();
+ final int seed = 0x9747b28c;
+ // 'm' and 'r' are mixing constants generated offline.
+ // They're not really 'magic', they just happen to work well.
+ final int m = 0x5bd1e995;
+ final int r = 24;
+
+ // Initialize the hash to a random value
+ int h = seed ^ length;
+ final int length4 = length / 4;
+ data = data.order() == LITTLE_ENDIAN ? data : data.slice().order(LITTLE_ENDIAN);
Review Comment:
> I don't think it's necessary to do that
can you help me understand how this works? I am concerned about the fact that murmur2 requires little endian order but incoming data could be in big endian format.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org