You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/09/29 18:00:01 UTC
[kafka] branch trunk updated: KAFKA-4852: Fix ByteBufferSerializer#serialize(String, ByteBuffer) not compatible with offsets (#12683)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 51dbd175b08 KAFKA-4852: Fix ByteBufferSerializer#serialize(String, ByteBuffer) not compatible with offsets (#12683)
51dbd175b08 is described below
commit 51dbd175b08e78aeca03d6752847aa5f23c98659
Author: LinShunKang <li...@gmail.com>
AuthorDate: Fri Sep 30 01:59:47 2022 +0800
KAFKA-4852: Fix ByteBufferSerializer#serialize(String, ByteBuffer) not compatible with offsets (#12683)
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../common/serialization/ByteBufferSerializer.java | 31 ++++++++++++++++------
.../common/serialization/SerializationTest.java | 19 +++++++++++++
2 files changed, 42 insertions(+), 8 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
index 9fb12544e0f..5987688759e 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
@@ -16,25 +16,40 @@
*/
package org.apache.kafka.common.serialization;
+import org.apache.kafka.common.utils.Utils;
+
import java.nio.ByteBuffer;
+/**
+ * ByteBufferSerializer will not change ByteBuffer's mark, position and limit.
+ * And do not need to flip before call <i>serialize(String, ByteBuffer)</i>. For example:
+ *
+ * <blockquote>
+ * <pre>
+ * ByteBufferSerializer serializer = ...; // Create Serializer
+ * ByteBuffer buffer = ...; // Allocate ByteBuffer
+ * buffer.put(data); // Put data into buffer, do not need to flip
+ * serializer.serialize(topic, buffer); // Serialize buffer
+ * </pre>
+ * </blockquote>
+ */
public class ByteBufferSerializer implements Serializer<ByteBuffer> {
+
+ @Override
public byte[] serialize(String topic, ByteBuffer data) {
- if (data == null)
+ if (data == null) {
return null;
-
- data.rewind();
+ }
if (data.hasArray()) {
- byte[] arr = data.array();
+ final byte[] arr = data.array();
if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
return arr;
}
}
- byte[] ret = new byte[data.remaining()];
- data.get(ret, 0, ret.length);
- data.rewind();
- return ret;
+ final ByteBuffer copyData = data.asReadOnlyBuffer();
+ copyData.flip();
+ return Utils.toArray(copyData);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 85c09dd17ae..eb1fee3943f 100644
--- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -31,6 +31,8 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Stack;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -368,4 +370,21 @@ public class SerializationTest {
return Serdes.serdeFrom(serializer, deserializer);
}
+
+ @Test
+ public void testByteBufferSerializer() {
+ final byte[] bytes = "Hello".getBytes(UTF_8);
+ final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes);
+ final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes);
+ final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
+ final ByteBuffer directBuffer0 = ByteBuffer.allocateDirect(bytes.length + 1).put(bytes);
+ final ByteBuffer directBuffer1 = ByteBuffer.allocateDirect(bytes.length).put(bytes);
+ try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
+ assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0));
+ assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1));
+ assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2));
+ assertArrayEquals(bytes, serializer.serialize(topic, directBuffer0));
+ assertArrayEquals(bytes, serializer.serialize(topic, directBuffer1));
+ }
+ }
}