You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/13 18:49:52 UTC

kafka git commit: KAFKA-4769: Add Float serializer, deserializer, serde

Repository: kafka
Updated Branches:
  refs/heads/trunk 046519d07 -> cd69daa41


KAFKA-4769: Add Float serializer, deserializer, serde

Author: Michael G. Noll <mi...@confluent.io>

Reviewers: Dongjin Lee, Eno Thereska, Damian Guy, Colin P. McCabe, Matthas J. Sax, Guozhang Wang

Closes #2554 from miguno/KAFKA-4769


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cd69daa4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cd69daa4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cd69daa4

Branch: refs/heads/trunk
Commit: cd69daa4150631e9cea4e299c5ea08e12359118d
Parents: 046519d
Author: Michael G. Noll <mi...@confluent.io>
Authored: Mon Mar 13 11:49:49 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Mar 13 11:49:49 2017 -0700

----------------------------------------------------------------------
 .../common/serialization/FloatDeserializer.java | 47 ++++++++++
 .../common/serialization/FloatSerializer.java   | 42 +++++++++
 .../kafka/common/serialization/Serdes.java      | 17 ++++
 .../common/serialization/SerializationTest.java | 93 ++++++++++++++++++++
 4 files changed, 199 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cd69daa4/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java
new file mode 100644
index 0000000..d5f49f5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.util.Map;
+
+public class FloatDeserializer implements Deserializer<Float> {
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        // nothing to do
+    }
+
+    @Override
+    public Float deserialize(final String topic, final byte[] data) {
+        if (data == null)
+            return null;
+        if (data.length != 4) {
+            throw new SerializationException("Size of data received by Deserializer is not 4");
+        }
+
+        int value = 0;
+        for (byte b : data) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        return Float.intBitsToFloat(value);
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd69daa4/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java
new file mode 100644
index 0000000..badc258
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+public class FloatSerializer implements Serializer<Float> {
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        // nothing to do
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final Float data) {
+        if (data == null)
+            return null;
+
+        long bits = Float.floatToRawIntBits(data);
+        return new byte[] {
+            (byte) (bits >>> 24),
+            (byte) (bits >>> 16),
+            (byte) (bits >>> 8),
+            (byte) bits
+        };
+    }
+
+    @Override
+    public void close() {
+        // nothing to do
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd69daa4/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
index 15f9748..0793321 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -70,6 +70,12 @@ public class Serdes {
         }
     }
 
+    static public final class FloatSerde extends WrapperSerde<Float> {
+        public FloatSerde() {
+            super(new FloatSerializer(), new FloatDeserializer());
+        }
+    }
+
     static public final class DoubleSerde extends WrapperSerde<Double> {
         public DoubleSerde() {
             super(new DoubleSerializer(), new DoubleDeserializer());
@@ -114,6 +120,10 @@ public class Serdes {
             return (Serde<T>) Long();
         }
 
+        if (Float.class.isAssignableFrom(type)) {
+            return (Serde<T>) Float();
+        }
+
         if (Double.class.isAssignableFrom(type)) {
             return (Serde<T>) Double();
         }
@@ -166,6 +176,13 @@ public class Serdes {
     }
 
     /*
+     * A serde for nullable {@code Float} type.
+     */
+    static public Serde<Float> Float() {
+        return new FloatSerde();
+    }
+
+    /*
      * A serde for nullable {@code Double} type.
      */
     static public Serde<Double> Double() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cd69daa4/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 50b4594..12ccbe4 100644
--- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.serialization;
 
+import org.apache.kafka.common.errors.SerializationException;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -24,7 +25,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsNull.nullValue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class SerializationTest {
 
@@ -123,6 +128,94 @@ public class SerializationTest {
     }
 
     @Test
+    public void shouldSerializeDeserializeFloat() {
+        final Float[] floats = new Float[]{
+            5678567.12312f,
+            -5678567.12341f
+        };
+        final Serializer<Float> serializer = Serdes.Float().serializer();
+        final Deserializer<Float> deserializer = Serdes.Float().deserializer();
+
+        for (final Float value : floats) {
+            assertThat("Should round-trip a float",
+                value, equalTo(deserializer.deserialize(topic, serializer.serialize(topic, value))));
+        }
+
+        serializer.close();
+        deserializer.close();
+    }
+
+    @Test
+    public void floatSerializerShouldReturnNullForNull() {
+        final Serializer<Float> serializer = Serdes.Float().serializer();
+        assertThat(serializer.serialize(topic, null), nullValue());
+        serializer.close();
+    }
+
+    @Test
+    public void floatDeserializerShouldReturnNullForNull() {
+        final Deserializer<Float> deserializer = Serdes.Float().deserializer();
+        assertThat(deserializer.deserialize(topic, null), nullValue());
+        deserializer.close();
+    }
+
+    @Test
+    public void floatDeserializerShouldThrowSerializationExceptionOnZeroBytes() {
+        final Deserializer<Float> deserializer = Serdes.Float().deserializer();
+        try {
+            deserializer.deserialize(topic, new byte[0]);
+            fail("Should have thrown a SerializationException because of zero input bytes");
+        } catch (SerializationException e) {
+            // Ignore (there's no contract on the details of the exception)
+        }
+        deserializer.close();
+    }
+
+    @Test
+    public void floatDeserializerShouldThrowSerializationExceptionOnTooFewBytes() {
+        final Deserializer<Float> deserializer = Serdes.Float().deserializer();
+        try {
+            deserializer.deserialize(topic, new byte[3]);
+            fail("Should have thrown a SerializationException because of too few input bytes");
+        } catch (SerializationException e) {
+            // Ignore (there's no contract on the details of the exception)
+        }
+        deserializer.close();
+    }
+
+
+    @Test
+    public void floatDeserializerShouldThrowSerializationExceptionOnTooManyBytes() {
+        final Deserializer<Float> deserializer = Serdes.Float().deserializer();
+        try {
+            deserializer.deserialize(topic, new byte[5]);
+            fail("Should have thrown a SerializationException because of too many input bytes");
+        } catch (SerializationException e) {
+            // Ignore (there's no contract on the details of the exception)
+        }
+        deserializer.close();
+    }
+
+    @Test
+    public void floatSerdeShouldPreserveNaNValues() {
+        final int someNaNAsIntBits = 0x7f800001;
+        final float someNaN = Float.intBitsToFloat(someNaNAsIntBits);
+        final int anotherNaNAsIntBits = 0x7f800002;
+        final float anotherNaN = Float.intBitsToFloat(anotherNaNAsIntBits);
+
+        final Serde<Float> serde = Serdes.Float();
+        // Because of NaN semantics we must assert based on the raw int bits.
+        final Float roundtrip = serde.deserializer().deserialize(topic,
+            serde.serializer().serialize(topic, someNaN));
+        assertThat(Float.floatToRawIntBits(roundtrip), equalTo(someNaNAsIntBits));
+        final Float otherRoundtrip = serde.deserializer().deserialize(topic,
+            serde.serializer().serialize(topic, anotherNaN));
+        assertThat(Float.floatToRawIntBits(otherRoundtrip), equalTo(anotherNaNAsIntBits));
+
+        serde.close();
+    }
+
+    @Test
     public void testDoubleSerializer() {
         Double[] doubles = new Double[]{
             5678567.12312d,