You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/13 18:49:52 UTC
kafka git commit: KAFKA-4769: Add Float serializer, deserializer,
serde
Repository: kafka
Updated Branches:
refs/heads/trunk 046519d07 -> cd69daa41
KAFKA-4769: Add Float serializer, deserializer, serde
Author: Michael G. Noll <mi...@confluent.io>
Reviewers: Dongjin Lee, Eno Thereska, Damian Guy, Colin P. McCabe, Matthas J. Sax, Guozhang Wang
Closes #2554 from miguno/KAFKA-4769
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cd69daa4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cd69daa4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cd69daa4
Branch: refs/heads/trunk
Commit: cd69daa4150631e9cea4e299c5ea08e12359118d
Parents: 046519d
Author: Michael G. Noll <mi...@confluent.io>
Authored: Mon Mar 13 11:49:49 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Mar 13 11:49:49 2017 -0700
----------------------------------------------------------------------
.../common/serialization/FloatDeserializer.java | 47 ++++++++++
.../common/serialization/FloatSerializer.java | 42 +++++++++
.../kafka/common/serialization/Serdes.java | 17 ++++
.../common/serialization/SerializationTest.java | 93 ++++++++++++++++++++
4 files changed, 199 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd69daa4/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java
new file mode 100644
index 0000000..d5f49f5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.util.Map;
+
+public class FloatDeserializer implements Deserializer<Float> {
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ // nothing to do
+ }
+
+ @Override
+ public Float deserialize(final String topic, final byte[] data) {
+ if (data == null)
+ return null;
+ if (data.length != 4) {
+ throw new SerializationException("Size of data received by Deserializer is not 4");
+ }
+
+ int value = 0;
+ for (byte b : data) {
+ value <<= 8;
+ value |= b & 0xFF;
+ }
+ return Float.intBitsToFloat(value);
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd69daa4/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java
new file mode 100644
index 0000000..badc258
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import java.util.Map;
+
+public class FloatSerializer implements Serializer<Float> {
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ // nothing to do
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Float data) {
+ if (data == null)
+ return null;
+
+ long bits = Float.floatToRawIntBits(data);
+ return new byte[] {
+ (byte) (bits >>> 24),
+ (byte) (bits >>> 16),
+ (byte) (bits >>> 8),
+ (byte) bits
+ };
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd69daa4/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
index 15f9748..0793321 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -70,6 +70,12 @@ public class Serdes {
}
}
+ static public final class FloatSerde extends WrapperSerde<Float> {
+ public FloatSerde() {
+ super(new FloatSerializer(), new FloatDeserializer());
+ }
+ }
+
static public final class DoubleSerde extends WrapperSerde<Double> {
public DoubleSerde() {
super(new DoubleSerializer(), new DoubleDeserializer());
@@ -114,6 +120,10 @@ public class Serdes {
return (Serde<T>) Long();
}
+ if (Float.class.isAssignableFrom(type)) {
+ return (Serde<T>) Float();
+ }
+
if (Double.class.isAssignableFrom(type)) {
return (Serde<T>) Double();
}
@@ -166,6 +176,13 @@ public class Serdes {
}
/*
+ * A serde for nullable {@code Float} type.
+ */
+ static public Serde<Float> Float() {
+ return new FloatSerde();
+ }
+
+ /*
* A serde for nullable {@code Double} type.
*/
static public Serde<Double> Double() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/cd69daa4/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 50b4594..12ccbe4 100644
--- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.serialization;
+import org.apache.kafka.common.errors.SerializationException;
import org.junit.Test;
import java.nio.ByteBuffer;
@@ -24,7 +25,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
public class SerializationTest {
@@ -123,6 +128,94 @@ public class SerializationTest {
}
@Test
+ public void shouldSerializeDeserializeFloat() {
+ final Float[] floats = new Float[]{
+ 5678567.12312f,
+ -5678567.12341f
+ };
+ final Serializer<Float> serializer = Serdes.Float().serializer();
+ final Deserializer<Float> deserializer = Serdes.Float().deserializer();
+
+ for (final Float value : floats) {
+ assertThat("Should round-trip a float",
+ value, equalTo(deserializer.deserialize(topic, serializer.serialize(topic, value))));
+ }
+
+ serializer.close();
+ deserializer.close();
+ }
+
+ @Test
+ public void floatSerializerShouldReturnNullForNull() {
+ final Serializer<Float> serializer = Serdes.Float().serializer();
+ assertThat(serializer.serialize(topic, null), nullValue());
+ serializer.close();
+ }
+
+ @Test
+ public void floatDeserializerShouldReturnNullForNull() {
+ final Deserializer<Float> deserializer = Serdes.Float().deserializer();
+ assertThat(deserializer.deserialize(topic, null), nullValue());
+ deserializer.close();
+ }
+
+ @Test
+ public void floatDeserializerShouldThrowSerializationExceptionOnZeroBytes() {
+ final Deserializer<Float> deserializer = Serdes.Float().deserializer();
+ try {
+ deserializer.deserialize(topic, new byte[0]);
+ fail("Should have thrown a SerializationException because of zero input bytes");
+ } catch (SerializationException e) {
+ // Ignore (there's no contract on the details of the exception)
+ }
+ deserializer.close();
+ }
+
+ @Test
+ public void floatDeserializerShouldThrowSerializationExceptionOnTooFewBytes() {
+ final Deserializer<Float> deserializer = Serdes.Float().deserializer();
+ try {
+ deserializer.deserialize(topic, new byte[3]);
+ fail("Should have thrown a SerializationException because of too few input bytes");
+ } catch (SerializationException e) {
+ // Ignore (there's no contract on the details of the exception)
+ }
+ deserializer.close();
+ }
+
+
+ @Test
+ public void floatDeserializerShouldThrowSerializationExceptionOnTooManyBytes() {
+ final Deserializer<Float> deserializer = Serdes.Float().deserializer();
+ try {
+ deserializer.deserialize(topic, new byte[5]);
+ fail("Should have thrown a SerializationException because of too many input bytes");
+ } catch (SerializationException e) {
+ // Ignore (there's no contract on the details of the exception)
+ }
+ deserializer.close();
+ }
+
+ @Test
+ public void floatSerdeShouldPreserveNaNValues() {
+ final int someNaNAsIntBits = 0x7f800001;
+ final float someNaN = Float.intBitsToFloat(someNaNAsIntBits);
+ final int anotherNaNAsIntBits = 0x7f800002;
+ final float anotherNaN = Float.intBitsToFloat(anotherNaNAsIntBits);
+
+ final Serde<Float> serde = Serdes.Float();
+ // Because of NaN semantics we must assert based on the raw int bits.
+ final Float roundtrip = serde.deserializer().deserialize(topic,
+ serde.serializer().serialize(topic, someNaN));
+ assertThat(Float.floatToRawIntBits(roundtrip), equalTo(someNaNAsIntBits));
+ final Float otherRoundtrip = serde.deserializer().deserialize(topic,
+ serde.serializer().serialize(topic, anotherNaN));
+ assertThat(Float.floatToRawIntBits(otherRoundtrip), equalTo(anotherNaNAsIntBits));
+
+ serde.close();
+ }
+
+ @Test
public void testDoubleSerializer() {
Double[] doubles = new Double[]{
5678567.12312d,