You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/29 18:24:13 UTC
kafka git commit: KAFKA-3029: Mark TopicPartition and
OffsetAndMetadata as Serializable
Repository: kafka
Updated Branches:
refs/heads/trunk 20afdcdd2 -> e1d32bdff
KAFKA-3029: Mark TopicPartition and OffsetAndMetadata as Serializable
Patch for issue KAFKA-3029
Given that the fix is trivial no new test case is needed. I have run the test suite using gradle (as mentioned https://github.com/apache/kafka/blob/trunk/README.md) and suite runs clean.
Author: Praveen Devarao <pr...@in.ibm.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Gwen Shapira <cs...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #711 from praveend/tp_serializable_branch
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e1d32bdf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e1d32bdf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e1d32bdf
Branch: refs/heads/trunk
Commit: e1d32bdffd0979596058788ffa3b1130cc7d93b8
Parents: 20afdcd
Author: Praveen Devarao <pr...@in.ibm.com>
Authored: Fri Jan 29 09:23:55 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Jan 29 09:23:55 2016 -0800
----------------------------------------------------------------------
.../clients/consumer/OffsetAndMetadata.java | 4 +-
.../org/apache/kafka/common/TopicPartition.java | 4 +-
...alizeCompatibilityOffsetAndMetadataTest.java | 61 +++++++++++++++++++
...erializeCompatibilityTopicPartitionTest.java | 61 +++++++++++++++++++
.../apache/kafka/common/utils/Serializer.java | 49 +++++++++++++++
.../offsetAndMetadataSerializedfile | Bin 0 -> 144 bytes
.../serializedData/topicPartitionSerializedfile | Bin 0 -> 125 bytes
7 files changed, 177 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e1d32bdf/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
index 1a93047..66b257d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
@@ -12,12 +12,14 @@
*/
package org.apache.kafka.clients.consumer;
+import java.io.Serializable;
+
/**
* The Kafka offset commit API allows users to provide additional metadata (in the form of a string)
* when an offset is committed. This can be useful (for example) to store information about which
* node made the commit, what time the commit was made, etc.
*/
-public class OffsetAndMetadata {
+public class OffsetAndMetadata implements Serializable {
private final long offset;
private final String metadata;
http://git-wip-us.apache.org/repos/asf/kafka/blob/e1d32bdf/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
index 3348684..383c00d 100644
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
@@ -16,10 +16,12 @@
*/
package org.apache.kafka.common;
+import java.io.Serializable;
+
/**
* A topic name and partition number
*/
-public final class TopicPartition {
+public final class TopicPartition implements Serializable {
private int hash = 0;
private final int partition;
http://git-wip-us.apache.org/repos/asf/kafka/blob/e1d32bdf/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java
new file mode 100644
index 0000000..ce1d4cd
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/SerializeCompatibilityOffsetAndMetadataTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.utils.Serializer;
+import org.junit.Test;
+
+import java.io.IOException;
+
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test case ensures OffsetAndMetadata class is serializable and is serialization compatible.
+ * Note: this ensures that the current code can deserialize data serialized with older versions of the code, but not the reverse.
+ * That is, older code won't necessarily be able to deserialize data serialized with newer code.
+ */
+public class SerializeCompatibilityOffsetAndMetadataTest {
+ private String metadata = "test commit metadata";
+ private String fileName = "serializedData/offsetAndMetadataSerializedfile";
+ private long offset = 10;
+
+ private void checkValues(OffsetAndMetadata deSerOAM) {
+ //assert deserialized values are same as original
+ assertEquals("Offset should be " + offset + " but got " + deSerOAM.offset(), offset, deSerOAM.offset());
+ assertEquals("metadata should be " + metadata + " but got " + deSerOAM.metadata(), metadata, deSerOAM.metadata());
+ }
+
+ @Test
+ public void testSerializationRoundtrip() throws IOException, ClassNotFoundException {
+ //assert OffsetAndMetadata is serializable
+ OffsetAndMetadata origOAM = new OffsetAndMetadata(offset, metadata);
+ byte[] byteArray = Serializer.serialize(origOAM);
+
+ //deserialize the byteArray and check if the values are same as original
+ Object deserializedObject = Serializer.deserialize(byteArray);
+ assertTrue(deserializedObject instanceof OffsetAndMetadata);
+ checkValues((OffsetAndMetadata) deserializedObject);
+ }
+
+ @Test
+ public void testOffsetMetadataSerializationCompatibility() throws IOException, ClassNotFoundException {
+ // assert serialized OffsetAndMetadata object in file (oamserializedfile under resources folder) is
+ // deserializable into OffsetAndMetadata and is compatible
+ Object deserializedObject = Serializer.deserialize(fileName);
+ assertTrue(deserializedObject instanceof OffsetAndMetadata);
+ checkValues((OffsetAndMetadata) deserializedObject);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e1d32bdf/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java
new file mode 100644
index 0000000..7786a73
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/SerializeCompatibilityTopicPartitionTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.utils.Serializer;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test ensures TopicPartition class is serializable and is serialization compatible.
+ * Note: this ensures that the current code can deserialize data serialized with older versions of the code, but not the reverse.
+ * That is, older code won't necessarily be able to deserialize data serialized with newer code.
+ */
+public class SerializeCompatibilityTopicPartitionTest {
+
+ private String topicName = "mytopic";
+ private String fileName = "serializedData/topicPartitionSerializedfile";
+ private int partNum = 5;
+
+ private void checkValues(TopicPartition deSerTP) {
+ //assert deserialized values are same as original
+ assertEquals("partition number should be " + partNum + " but got " + deSerTP.partition(), partNum, deSerTP.partition());
+ assertEquals("topic should be " + topicName + " but got " + deSerTP.topic(), topicName, deSerTP.topic());
+ }
+
+ @Test
+ public void testSerializationRoundtrip() throws IOException, ClassNotFoundException {
+ //assert TopicPartition is serializable and deserialization renders the clone of original properly
+ TopicPartition origTp = new TopicPartition(topicName, partNum);
+ byte[] byteArray = Serializer.serialize(origTp);
+
+ //deserialize the byteArray and check if the values are same as original
+ Object deserializedObject = Serializer.deserialize(byteArray);
+ assertTrue(deserializedObject instanceof TopicPartition);
+ checkValues((TopicPartition) deserializedObject);
+ }
+
+ @Test
+ public void testTopiPartitionSerializationCompatibility() throws IOException, ClassNotFoundException {
+ // assert serialized TopicPartition object in file (serializedData/topicPartitionSerializedfile) is
+ // deserializable into TopicPartition and is compatible
+ Object deserializedObject = Serializer.deserialize(fileName);
+ assertTrue(deserializedObject instanceof TopicPartition);
+ checkValues((TopicPartition) deserializedObject);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e1d32bdf/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java
new file mode 100644
index 0000000..f30c0e1
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/Serializer.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+
+public class Serializer {
+
+ public static byte[] serialize(Object toSerialize) throws IOException {
+ ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
+ try (ObjectOutputStream ooStream = new ObjectOutputStream(arrayOutputStream)) {
+ ooStream.writeObject(toSerialize);
+ return arrayOutputStream.toByteArray();
+ }
+ }
+
+ public static Object deserialize(InputStream inputStream) throws IOException, ClassNotFoundException {
+ try (ObjectInputStream objectInputStream = new ObjectInputStream(inputStream)) {
+ return objectInputStream.readObject();
+ }
+ }
+
+ public static Object deserialize(byte[] byteArray) throws IOException, ClassNotFoundException {
+ ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(byteArray);
+ return deserialize(arrayInputStream);
+ }
+
+ public static Object deserialize(String fileName) throws IOException, ClassNotFoundException {
+ ClassLoader classLoader = Serializer.class.getClassLoader();
+ InputStream fileStream = classLoader.getResourceAsStream(fileName);
+ return deserialize(fileStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e1d32bdf/clients/src/test/resources/serializedData/offsetAndMetadataSerializedfile
----------------------------------------------------------------------
diff --git a/clients/src/test/resources/serializedData/offsetAndMetadataSerializedfile b/clients/src/test/resources/serializedData/offsetAndMetadataSerializedfile
new file mode 100644
index 0000000..95319cb
Binary files /dev/null and b/clients/src/test/resources/serializedData/offsetAndMetadataSerializedfile differ
http://git-wip-us.apache.org/repos/asf/kafka/blob/e1d32bdf/clients/src/test/resources/serializedData/topicPartitionSerializedfile
----------------------------------------------------------------------
diff --git a/clients/src/test/resources/serializedData/topicPartitionSerializedfile b/clients/src/test/resources/serializedData/topicPartitionSerializedfile
new file mode 100644
index 0000000..2c1c501
Binary files /dev/null and b/clients/src/test/resources/serializedData/topicPartitionSerializedfile differ