You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/29 18:24:13 UTC

kafka git commit: KAFKA-3029: Mark TopicPartition and OffsetAndMetadata as Serializable

Repository: kafka
Updated Branches:
  refs/heads/trunk 20afdcdd2 -> e1d32bdff


KAFKA-3029: Mark TopicPartition and OffsetAndMetadata as Serializable

Patch for issue KAFKA-3029

Given that the fix is trivial no new test case is needed. I have run the test suite using gradle (as mentioned  https://github.com/apache/kafka/blob/trunk/README.md) and suite runs clean.

Author: Praveen Devarao <pr...@in.ibm.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Gwen Shapira <cs...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #711 from praveend/tp_serializable_branch


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e1d32bdf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e1d32bdf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e1d32bdf

Branch: refs/heads/trunk
Commit: e1d32bdffd0979596058788ffa3b1130cc7d93b8
Parents: 20afdcd
Author: Praveen Devarao <pr...@in.ibm.com>
Authored: Fri Jan 29 09:23:55 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Jan 29 09:23:55 2016 -0800

----------------------------------------------------------------------
 .../clients/consumer/OffsetAndMetadata.java     |   4 +-
 .../org/apache/kafka/common/TopicPartition.java |   4 +-
 ...alizeCompatibilityOffsetAndMetadataTest.java |  61 +++++++++++++++++++
 ...erializeCompatibilityTopicPartitionTest.java |  61 +++++++++++++++++++
 .../apache/kafka/common/utils/Serializer.java   |  49 +++++++++++++++
 .../offsetAndMetadataSerializedfile             | Bin 0 -> 144 bytes
 .../serializedData/topicPartitionSerializedfile | Bin 0 -> 125 bytes
 7 files changed, 177 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e1d32bdf/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
index 1a93047..66b257d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
@@ -12,12 +12,14 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import java.io.Serializable;
+
 /**
  * The Kafka offset commit API allows users to provide additional metadata (in the form of a string)
  * when an offset is committed. This can be useful (for example) to store information about which
  * node made the commit, what time the commit was made, etc.
  */
-public class OffsetAndMetadata {
+public class OffsetAndMetadata implements Serializable {
     private final long offset;
     private final String metadata;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1d32bdf/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
index 3348684..383c00d 100644
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
@@ -16,10 +16,12 @@
  */
 package org.apache.kafka.common;
 
+import java.io.Serializable;
+
 /**
  * A topic name and partition number
  */
-public final class TopicPartition {
+public final class TopicPartition implements Serializable {
 
     private int hash = 0;
     private final int partition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1d32bdf/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java
new file mode 100644
index 0000000..ce1d4cd
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.utils.Serializer;
+import org.junit.Test;
+
+import java.io.IOException;
+
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test case ensures OffsetAndMetadata class is serializable and is serialization compatible.
+ * Note: this ensures that the current code can deserialize data serialized with older versions of the code, but not the reverse.
+ * That is, older code won't necessarily be able to deserialize data serialized with newer code.
+ */
+public class SerializeCompatibilityOffsetAndMetadataTest {
+    private String metadata = "test commit metadata";
+    private String fileName = "serializedData/offsetAndMetadataSerializedfile";
+    private long offset = 10;
+
+    private void checkValues(OffsetAndMetadata deSerOAM) {
+        //assert deserialized values are same as original
+        assertEquals("Offset should be " + offset + " but got " + deSerOAM.offset(), offset, deSerOAM.offset());
+        assertEquals("metadata should be " + metadata + " but got " + deSerOAM.metadata(), metadata, deSerOAM.metadata());
+    }
+
+    @Test
+    public void testSerializationRoundtrip() throws IOException, ClassNotFoundException {
+        //assert OffsetAndMetadata is serializable
+        OffsetAndMetadata origOAM = new OffsetAndMetadata(offset, metadata);
+        byte[] byteArray =  Serializer.serialize(origOAM);
+
+        //deserialize the byteArray and check if the values are same as original
+        Object deserializedObject = Serializer.deserialize(byteArray);
+        assertTrue(deserializedObject instanceof OffsetAndMetadata);
+        checkValues((OffsetAndMetadata) deserializedObject);
+    }
+
+    @Test
+    public void testOffsetMetadataSerializationCompatibility() throws IOException, ClassNotFoundException {
+        // assert serialized OffsetAndMetadata object in file (oamserializedfile under resources folder) is
+        // deserializable into OffsetAndMetadata and is compatible
+        Object deserializedObject = Serializer.deserialize(fileName);
+        assertTrue(deserializedObject instanceof OffsetAndMetadata);
+        checkValues((OffsetAndMetadata) deserializedObject);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1d32bdf/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java
new file mode 100644
index 0000000..7786a73
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.utils.Serializer;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test ensures TopicPartition class is serializable and is serialization compatible.
+ * Note: this ensures that the current code can deserialize data serialized with older versions of the code, but not the reverse.
+ * That is, older code won't necessarily be able to deserialize data serialized with newer code.
+ */
+public class SerializeCompatibilityTopicPartitionTest {
+
+    private String topicName = "mytopic";
+    private String fileName = "serializedData/topicPartitionSerializedfile";
+    private int partNum = 5;
+
+    private void checkValues(TopicPartition deSerTP) {
+        //assert deserialized values are same as original
+        assertEquals("partition number should be " + partNum + " but got " + deSerTP.partition(), partNum, deSerTP.partition());
+        assertEquals("topic should be " + topicName + " but got " + deSerTP.topic(), topicName, deSerTP.topic());
+    }
+
+    @Test
+    public void testSerializationRoundtrip() throws IOException, ClassNotFoundException {
+        //assert TopicPartition is serializable and deserialization renders the clone of original properly
+        TopicPartition origTp = new TopicPartition(topicName, partNum);
+        byte[] byteArray = Serializer.serialize(origTp);
+
+        //deserialize the byteArray and check if the values are same as original
+        Object deserializedObject = Serializer.deserialize(byteArray);
+        assertTrue(deserializedObject instanceof TopicPartition);
+        checkValues((TopicPartition) deserializedObject);
+    }
+
+    @Test
+    public void testTopiPartitionSerializationCompatibility() throws IOException, ClassNotFoundException {
+        // assert serialized TopicPartition object in file (serializedData/topicPartitionSerializedfile) is
+        // deserializable into TopicPartition and is compatible
+        Object deserializedObject = Serializer.deserialize(fileName);
+        assertTrue(deserializedObject instanceof TopicPartition);
+        checkValues((TopicPartition) deserializedObject);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1d32bdf/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java
new file mode 100644
index 0000000..f30c0e1
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+
+public class Serializer {
+
+    public static byte[] serialize(Object toSerialize) throws IOException {
+        ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
+        try (ObjectOutputStream ooStream = new ObjectOutputStream(arrayOutputStream)) {
+            ooStream.writeObject(toSerialize);
+            return arrayOutputStream.toByteArray();
+        }
+    }
+
+    public static Object deserialize(InputStream inputStream) throws IOException, ClassNotFoundException {
+        try (ObjectInputStream objectInputStream = new ObjectInputStream(inputStream)) {
+            return objectInputStream.readObject();
+        }
+    }
+
+    public static Object deserialize(byte[] byteArray) throws IOException, ClassNotFoundException {
+        ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(byteArray);
+        return deserialize(arrayInputStream);
+    }
+
+    public static Object deserialize(String fileName) throws IOException, ClassNotFoundException {
+        ClassLoader classLoader = Serializer.class.getClassLoader();
+        InputStream fileStream = classLoader.getResourceAsStream(fileName);
+        return deserialize(fileStream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1d32bdf/clients/src/test/resources/serializedData/offsetAndMetadataSerializedfile
----------------------------------------------------------------------
diff --git a/clients/src/test/resources/serializedData/offsetAndMetadataSerializedfile b/clients/src/test/resources/serializedData/offsetAndMetadataSerializedfile
new file mode 100644
index 0000000..95319cb
Binary files /dev/null and b/clients/src/test/resources/serializedData/offsetAndMetadataSerializedfile differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1d32bdf/clients/src/test/resources/serializedData/topicPartitionSerializedfile
----------------------------------------------------------------------
diff --git a/clients/src/test/resources/serializedData/topicPartitionSerializedfile b/clients/src/test/resources/serializedData/topicPartitionSerializedfile
new file mode 100644
index 0000000..2c1c501
Binary files /dev/null and b/clients/src/test/resources/serializedData/topicPartitionSerializedfile differ