You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/24 00:04:10 UTC

kafka git commit: KAFKA-3046: Add ByteBuffer Serializer and Deserializer

Repository: kafka
Updated Branches:
  refs/heads/trunk f7fe9ccb4 -> 14b688e00


KAFKA-3046: Add ByteBuffer Serializer and Deserializer

https://issues.apache.org/jira/browse/KAFKA-3046

Author: Xin Wang <be...@163.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #718 from vesense/patch-3


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/14b688e0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/14b688e0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/14b688e0

Branch: refs/heads/trunk
Commit: 14b688e00ddb54a5c8451411d269721484be875f
Parents: f7fe9cc
Author: Xin Wang <be...@163.com>
Authored: Tue Feb 23 15:03:55 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Feb 23 15:03:55 2016 -0800

----------------------------------------------------------------------
 .../serialization/ByteBufferDeserializer.java   | 34 +++++++++++++++
 .../serialization/ByteBufferSerializer.java     | 46 ++++++++++++++++++++
 .../common/serialization/SerializationTest.java | 23 ++++++++++
 3 files changed, 103 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/14b688e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java
new file mode 100644
index 0000000..90c1ba0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class ByteBufferDeserializer implements Deserializer<ByteBuffer> {
+
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // nothing to do
+    }
+
+    public ByteBuffer deserialize(String topic, byte[] data) {
+        if (data == null)
+            return null;
+
+        return ByteBuffer.wrap(data);
+    }
+
+    public void close() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/14b688e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
new file mode 100644
index 0000000..a954705
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class ByteBufferSerializer implements Serializer<ByteBuffer> {
+
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // nothing to do
+    }
+
+    public byte[] serialize(String topic, ByteBuffer data) {
+        if (data == null)
+            return null;
+
+        data.rewind();
+
+        if (data.hasArray()) {
+            byte[] arr = data.array();
+            if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
+                return arr;
+            }
+        }
+
+        byte[] ret = new byte[data.remaining()];
+        data.get(ret, 0, ret.length);
+        data.rewind();
+        return ret;
+    }
+
+    public void close() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/14b688e0/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 383bf48..87d9e0a 100644
--- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -14,6 +14,7 @@ package org.apache.kafka.common.serialization;
 
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -72,6 +73,28 @@ public class SerializationTest {
 
         assertEquals("Should support null in serialization and deserialization",
                 null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null)));
+
+        serializer.close();
+        deserializer.close();
+    }
+
+    @Test
+    public void testByteBufferSerializer() {
+        String mytopic = "testTopic";
+        ByteBuffer buf = ByteBuffer.allocate(10);
+        buf.put("my string".getBytes());
+
+        Serializer<ByteBuffer> serializer = new ByteBufferSerializer();
+        Deserializer<ByteBuffer> deserializer = new ByteBufferDeserializer();
+
+        assertEquals("Should get the original ByteBuffer after serialization and deserialization",
+              buf, deserializer.deserialize(mytopic, serializer.serialize(mytopic, buf)));
+
+        assertEquals("Should support null in serialization and deserialization",
+                null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null)));
+
+        serializer.close();
+        deserializer.close();
     }
 
     private SerDeser<String> getStringSerDeser(String encoder) {