You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/09/10 00:22:34 UTC

[kafka] branch trunk updated: KAFKA-4932: Add support for UUID serialization and deserialization (KIP-206)

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 164ef94  KAFKA-4932: Add support for UUID serialization and deserialization (KIP-206)
164ef94 is described below

commit 164ef9462e9d18a36f7be856243a1cacf9a300bf
Author: Brandon Kirchner <br...@civitaslearning.com>
AuthorDate: Sun Sep 9 17:22:18 2018 -0700

    KAFKA-4932: Add support for UUID serialization and deserialization (KIP-206)
    
    [KAFKA-4932](https://issues.apache.org/jira/browse/KAFKA-4932)
    
    Added a UUID Serializer / Deserializer.
    
    Added the UUID type to the SerializationTest
    
    Author: Brandon Kirchner <br...@civitaslearning.com>
    
    Reviewers: Jeff Klukas <je...@klukas.net>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #4438 from brandonkirchner/KAFKA-4932.uuid-serde
---
 .../apache/kafka/common/serialization/Serdes.java  | 20 +++++++-
 .../common/serialization/UUIDDeserializer.java     | 60 ++++++++++++++++++++++
 .../kafka/common/serialization/UUIDSerializer.java | 58 +++++++++++++++++++++
 .../common/serialization/SerializationTest.java    |  2 +
 4 files changed, 139 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
index 7825ad4..9f1c7ce 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.UUID;
 
 /**
  * Factory for creating serializers / deserializers.
@@ -112,6 +113,12 @@ public class Serdes {
         }
     }
 
+    static public final class UUIDSerde extends WrapperSerde<UUID> {
+        public UUIDSerde() {
+            super(new UUIDSerializer(), new UUIDDeserializer());
+        }
+    }
+
     @SuppressWarnings("unchecked")
     static public <T> Serde<T> serdeFrom(Class<T> type) {
         if (String.class.isAssignableFrom(type)) {
@@ -150,9 +157,13 @@ public class Serdes {
             return (Serde<T>) Bytes();
         }
 
+        if (UUID.class.isAssignableFrom(type)) {
+            return (Serde<T>) UUID();
+        }
+
         // TODO: we can also serializes objects of type T using generic Java serialization by default
         throw new IllegalArgumentException("Unknown class for built-in serializer. Supported types are: " +
-            "String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes");
+            "String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes, UUID");
     }
 
     /**
@@ -229,6 +240,13 @@ public class Serdes {
     }
 
     /*
+     * A serde for nullable {@code UUID} type
+     */
+    static public Serde<UUID> UUID() {
+        return new UUIDSerde();
+    }
+
+    /*
      * A serde for nullable {@code byte[]} type.
      */
     static public Serde<byte[]> ByteArray() {
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java
new file mode 100644
index 0000000..a6eb2ea
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ *  We are converting the byte array to String before deserializing to UUID. String encoding defaults to UTF8 and can be customized by setting
+ *  the property key.deserializer.encoding, value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
+ */
+public class UUIDDeserializer implements Deserializer<UUID> {
+    private String encoding = "UTF8";
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
+        Object encodingValue = configs.get(propertyName);
+        if (encodingValue == null)
+            encodingValue = configs.get("deserializer.encoding");
+        if (encodingValue != null && encodingValue instanceof String)
+            encoding = (String) encodingValue;
+    }
+
+    @Override
+    public UUID deserialize(String topic, byte[] data) {
+        try {
+            if (data == null)
+                return null;
+            else
+                return UUID.fromString(new String(data, encoding));
+        } catch (UnsupportedEncodingException e) {
+            throw new SerializationException("Error when deserializing byte[] to UUID due to unsupported encoding " + encoding, e);
+        } catch (IllegalArgumentException e) {
+            throw new SerializationException("Error parsing data into UUID", e);
+        }
+    }
+
+    @Override
+    public void close() {
+      // do nothing
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java
new file mode 100644
index 0000000..d8e2524
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ *  We are converting UUID to String before serializing. String encoding defaults to UTF8 and can be customized by setting
+ *  the property key.deserializer.encoding, value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last.
+ */
+public class UUIDSerializer implements Serializer<UUID> {
+    private String encoding = "UTF8";
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
+        Object encodingValue = configs.get(propertyName);
+        if (encodingValue == null)
+            encodingValue = configs.get("serializer.encoding");
+        if (encodingValue instanceof String)
+            encoding = (String) encodingValue;
+    }
+
+    @Override
+    public byte[] serialize(String topic, UUID data) {
+        try {
+            if (data == null)
+                return null;
+            else
+                return data.toString().getBytes(encoding);
+        } catch (UnsupportedEncodingException e) {
+            throw new SerializationException("Error when serializing UUID to byte[] due to unsupported encoding " + encoding);
+        }
+    }
+
+    @Override
+    public void close() {
+      // nothing to do
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 38bdbfe..16c35a8 100644
--- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -45,6 +46,7 @@ public class SerializationTest {
             put(byte[].class, Arrays.asList("my string".getBytes()));
             put(ByteBuffer.class, Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes())));
             put(Bytes.class, Arrays.asList(new Bytes("my string".getBytes())));
+            put(UUID.class, Arrays.asList(UUID.randomUUID()));
         }
     };