You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/30 16:57:15 UTC
[2/2] git commit: [FLINK-1005] Extend TypeSerializer interface to
handle non-mutable object deserialization more efficiently.
[FLINK-1005] Extend TypeSerializer interface to handle non-mutable object deserialization more efficiently.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/76d4a75e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/76d4a75e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/76d4a75e
Branch: refs/heads/master
Commit: 76d4a75e823c31a899f2143fb6be185b90e55532
Parents: ea4c882
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jul 7 19:39:24 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 15:10:38 2014 +0200
----------------------------------------------------------------------
.../streamrecord/StreamRecordSerializer.java | 22 +++++-
.../api/common/typeutils/TypeSerializer.java | 42 ++++++-----
.../typeutils/base/BooleanSerializer.java | 14 +++-
.../typeutils/base/BooleanValueSerializer.java | 15 +++-
.../common/typeutils/base/ByteSerializer.java | 15 +++-
.../typeutils/base/ByteValueSerializer.java | 13 +++-
.../common/typeutils/base/CharSerializer.java | 15 +++-
.../typeutils/base/CharValueSerializer.java | 14 +++-
.../common/typeutils/base/DoubleSerializer.java | 16 +++--
.../typeutils/base/DoubleValueSerializer.java | 13 +++-
.../common/typeutils/base/FloatSerializer.java | 16 +++--
.../typeutils/base/FloatValueSerializer.java | 15 +++-
.../typeutils/base/GenericArraySerializer.java | 38 ++++++++--
.../common/typeutils/base/IntSerializer.java | 15 +++-
.../typeutils/base/IntValueSerializer.java | 13 +++-
.../common/typeutils/base/LongSerializer.java | 15 +++-
.../typeutils/base/LongValueSerializer.java | 13 +++-
.../common/typeutils/base/ShortSerializer.java | 15 +++-
.../typeutils/base/ShortValueSerializer.java | 13 +++-
.../common/typeutils/base/StringSerializer.java | 15 +++-
.../typeutils/base/StringValueSerializer.java | 40 ++++++++++-
.../typeutils/base/TypeSerializerSingleton.java | 38 ++++++++++
.../array/BooleanPrimitiveArraySerializer.java | 26 ++++---
.../array/BytePrimitiveArraySerializer.java | 27 ++++---
.../array/CharPrimitiveArraySerializer.java | 27 ++++---
.../array/DoublePrimitiveArraySerializer.java | 29 +++++---
.../array/FloatPrimitiveArraySerializer.java | 27 ++++---
.../base/array/IntPrimitiveArraySerializer.java | 27 ++++---
.../array/LongPrimitiveArraySerializer.java | 29 +++++---
.../array/ShortPrimitiveArraySerializer.java | 27 ++++---
.../base/array/StringArraySerializer.java | 30 +++++---
.../typeutils/record/RecordSerializer.java | 11 ++-
.../common/typeutils/SerializerTestBase.java | 49 ++++++++++++-
.../typeutils/SerializerTestInstance.java | 4 +-
.../base/BooleanValueSerializerTest.java | 56 +++++++++++++++
.../typeutils/base/ByteValueSerializerTest.java | 57 +++++++++++++++
.../typeutils/base/CharValueSerializerTest.java | 56 +++++++++++++++
.../base/DoubleValueSerializerTest.java | 58 +++++++++++++++
.../base/FloatValueSerializerTest.java | 58 +++++++++++++++
.../typeutils/base/IntValueSerializerTest.java | 56 +++++++++++++++
.../typeutils/base/LongValueSerializerTest.java | 56 +++++++++++++++
.../base/ShortValueSerializerTest.java | 57 +++++++++++++++
.../base/StringValueSerializerTest.java | 55 ++++++++++++++
flink-core/src/test/resources/logback-test.xml | 4 ++
.../java/typeutils/runtime/AvroSerializer.java | 35 ++++++++-
.../runtime/CopyableValueSerializer.java | 27 ++++++-
.../java/typeutils/runtime/KryoSerializer.java | 39 ++++++++--
.../java/typeutils/runtime/PojoSerializer.java | 44 ++++++++++++
.../java/typeutils/runtime/TupleSerializer.java | 25 ++++++-
.../typeutils/runtime/TupleSerializerBase.java | 2 +
.../java/typeutils/runtime/ValueSerializer.java | 31 +++++++-
.../typeutils/runtime/WritableSerializer.java | 30 +++++++-
.../SpillingResettableIteratorTest.java | 7 +-
.../testutils/types/IntListSerializer.java | 13 +++-
.../testutils/types/IntPairSerializer.java | 10 +++
.../testutils/types/IntValueSerializer.java | 75 --------------------
.../testutils/types/StringPairSerializer.java | 9 +++
.../scala/typeutils/CaseClassSerializer.scala | 27 ++++++-
.../VertexWithAdjacencyListSerializer.java | 29 ++++----
.../VertexWithRankAndDanglingSerializer.java | 26 ++++---
.../types/VertexWithRankSerializer.java | 26 ++++---
61 files changed, 1405 insertions(+), 301 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
index 932bae0..85faa9e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
@@ -62,6 +62,15 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
throw new RuntimeException("Cannot instantiate StreamRecord.", e);
}
}
+
+ @Override
+ public StreamRecord<T> copy(StreamRecord<T> from) {
+ StreamRecord<T> rec = new StreamRecord<T>();
+ rec.isTuple = from.isTuple;
+ rec.setId(from.getId().copy());
+ rec.setObject(typeSerializer.copy(from.getObject()));
+ return rec;
+ }
@Override
public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
@@ -81,10 +90,18 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
value.getId().write(target);
typeSerializer.serialize(value.getObject(), target);
}
+
+ @Override
+ public StreamRecord<T> deserialize(DataInputView source) throws IOException {
+ StreamRecord<T> record = new StreamRecord<T>();
+ record.isTuple = this.isTuple;
+ record.getId().read(source);
+ record.setObject(typeSerializer.deserialize(source));
+ return record;
+ }
@Override
- public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source)
- throws IOException {
+ public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
reuse.getId().read(source);
reuse.setObject(typeSerializer.deserialize(reuse.getObject(), source));
return reuse;
@@ -94,5 +111,4 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
public void copy(DataInputView source, DataOutputView target) throws IOException {
// Needs to be implemented
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 9be4a89..87d7e20 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -73,10 +73,21 @@ public abstract class TypeSerializer<T> implements Serializable {
public abstract T createInstance();
/**
- * Creates a copy from the given element, storing the copied result in the given reuse element if type is mutable.
+ * Creates a deep copy of the given element in a new element.
*
* @param from The element reuse be copied.
- * @param reuse The element to be reused.
+ * @return A deep copy of the element.
+ */
+ public abstract T copy(T from);
+
+ /**
+ * Creates a copy from the given element.
+ * The method makes an attempt to store the copy in the given reuse element, if the type is mutable.
+ * This is, however, not guaranteed.
+ *
+ * @param from The element to be copied.
+ * @param reuse The element to be reused. May or may not be used.
+ * @return A deep copy of the element.
*/
public abstract T copy(T from, T reuse);
@@ -103,10 +114,22 @@ public abstract class TypeSerializer<T> implements Serializable {
public abstract void serialize(T record, DataOutputView target) throws IOException;
/**
+ * De-serializes a record from the given source input view.
+ *
+ * @param source The input view from which to read the data.
+ * @result The deserialized element.
+ *
+ * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
+ * input view, which may have an underlying I/O channel from which it reads.
+ */
+ public abstract T deserialize(DataInputView source) throws IOException;
+
+ /**
* De-serializes a record from the given source input view into the given reuse record instance if mutable.
*
* @param reuse The record instance into which to de-serialize the data.
* @param source The input view from which to read the data.
+ * @result The deserialized element.
*
* @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
* input view, which may have an underlying I/O channel from which it reads.
@@ -126,19 +149,4 @@ public abstract class TypeSerializer<T> implements Serializable {
* @throws IOException Thrown if any of the two views raises an exception.
*/
public abstract void copy(DataInputView source, DataOutputView target) throws IOException;
-
- // --------------------------------------------------------------------------------------------
- // Default Utilities: Hash code and equals are pre-defined for singleton serializers, where
- // all instances are equal
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return getClass().hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj != null && obj.getClass() == this.getClass();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
index 02a72c5..ecfb3c2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-
-public class BooleanSerializer extends TypeSerializer<Boolean> {
+public final class BooleanSerializer extends TypeSerializerSingleton<Boolean> {
private static final long serialVersionUID = 1L;
@@ -49,6 +47,11 @@ public class BooleanSerializer extends TypeSerializer<Boolean> {
}
@Override
+ public Boolean copy(Boolean from) {
+ return from;
+ }
+
+ @Override
public Boolean copy(Boolean from, Boolean reuse) {
return from;
}
@@ -64,6 +67,11 @@ public class BooleanSerializer extends TypeSerializer<Boolean> {
}
@Override
+ public Boolean deserialize(DataInputView source) throws IOException {
+ return Boolean.valueOf(source.readBoolean());
+ }
+
+ @Override
public Boolean deserialize(Boolean reuse, DataInputView source) throws IOException {
return Boolean.valueOf(source.readBoolean());
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
index ddab3bb..4795055 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.BooleanValue;
-public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
+public final class BooleanValueSerializer extends TypeSerializerSingleton<BooleanValue> {
private static final long serialVersionUID = 1L;
@@ -49,6 +48,13 @@ public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
}
@Override
+ public BooleanValue copy(BooleanValue from) {
+ BooleanValue result = new BooleanValue();
+ result.setValue(from.getValue());
+ return result;
+ }
+
+ @Override
public BooleanValue copy(BooleanValue from, BooleanValue reuse) {
reuse.setValue(from.getValue());
return reuse;
@@ -65,6 +71,11 @@ public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
}
@Override
+ public BooleanValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new BooleanValue(), source);
+ }
+
+ @Override
public BooleanValue deserialize(BooleanValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
index df52c13..32f3edd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-public class ByteSerializer extends TypeSerializer<Byte> {
+public final class ByteSerializer extends TypeSerializerSingleton<Byte> {
private static final long serialVersionUID = 1L;
@@ -50,6 +49,11 @@ public class ByteSerializer extends TypeSerializer<Byte> {
}
@Override
+ public Byte copy(Byte from) {
+ return from;
+ }
+
+ @Override
public Byte copy(Byte from, Byte reuse) {
return from;
}
@@ -65,9 +69,14 @@ public class ByteSerializer extends TypeSerializer<Byte> {
}
@Override
- public Byte deserialize(Byte reuse, DataInputView source) throws IOException {
+ public Byte deserialize(DataInputView source) throws IOException {
return Byte.valueOf(source.readByte());
}
+
+ @Override
+ public Byte deserialize(Byte reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
index ab3df38..24cc98b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.ByteValue;
-public class ByteValueSerializer extends TypeSerializer<ByteValue> {
+public final class ByteValueSerializer extends TypeSerializerSingleton<ByteValue> {
private static final long serialVersionUID = 1L;
@@ -49,6 +48,11 @@ public class ByteValueSerializer extends TypeSerializer<ByteValue> {
}
@Override
+ public ByteValue copy(ByteValue from) {
+ return copy(from, new ByteValue());
+ }
+
+ @Override
public ByteValue copy(ByteValue from, ByteValue reuse) {
reuse.setValue(from.getValue());
return reuse;
@@ -65,6 +69,11 @@ public class ByteValueSerializer extends TypeSerializer<ByteValue> {
}
@Override
+ public ByteValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new ByteValue(), source);
+ }
+
+ @Override
public ByteValue deserialize(ByteValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
index 4ef9a56..c46d3a0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-public class CharSerializer extends TypeSerializer<Character> {
+public final class CharSerializer extends TypeSerializerSingleton<Character> {
private static final long serialVersionUID = 1L;
@@ -50,6 +49,11 @@ public class CharSerializer extends TypeSerializer<Character> {
}
@Override
+ public Character copy(Character from) {
+ return from;
+ }
+
+ @Override
public Character copy(Character from, Character reuse) {
return from;
}
@@ -65,9 +69,14 @@ public class CharSerializer extends TypeSerializer<Character> {
}
@Override
- public Character deserialize(Character reuse, DataInputView source) throws IOException {
+ public Character deserialize(DataInputView source) throws IOException {
return Character.valueOf(source.readChar());
}
+
+ @Override
+ public Character deserialize(Character reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
index 4946743..71a8ef4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
@@ -20,13 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.CharValue;
-
-public class CharValueSerializer extends TypeSerializer<CharValue> {
+public class CharValueSerializer extends TypeSerializerSingleton<CharValue> {
private static final long serialVersionUID = 1L;
@@ -47,6 +45,11 @@ public class CharValueSerializer extends TypeSerializer<CharValue> {
public CharValue createInstance() {
return new CharValue();
}
+
+ @Override
+ public CharValue copy(CharValue from) {
+ return copy(from, new CharValue());
+ }
@Override
public CharValue copy(CharValue from, CharValue reuse) {
@@ -63,6 +66,11 @@ public class CharValueSerializer extends TypeSerializer<CharValue> {
public void serialize(CharValue record, DataOutputView target) throws IOException {
record.write(target);
}
+
+ @Override
+ public CharValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new CharValue(), source);
+ }
@Override
public CharValue deserialize(CharValue reuse, DataInputView source) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
index fc8f55d..8e09f7c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-
-public class DoubleSerializer extends TypeSerializer<Double> {
+public final class DoubleSerializer extends TypeSerializerSingleton<Double> {
private static final long serialVersionUID = 1L;
@@ -50,6 +48,11 @@ public class DoubleSerializer extends TypeSerializer<Double> {
}
@Override
+ public Double copy(Double from) {
+ return from;
+ }
+
+ @Override
public Double copy(Double from, Double reuse) {
return from;
}
@@ -65,9 +68,14 @@ public class DoubleSerializer extends TypeSerializer<Double> {
}
@Override
- public Double deserialize(Double reuse, DataInputView source) throws IOException {
+ public Double deserialize(DataInputView source) throws IOException {
return Double.valueOf(source.readDouble());
}
+
+ @Override
+ public Double deserialize(Double reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
index a19f83e..f4c7f37 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.DoubleValue;
-public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
+public final class DoubleValueSerializer extends TypeSerializerSingleton<DoubleValue> {
private static final long serialVersionUID = 1L;
@@ -49,6 +48,11 @@ public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
}
@Override
+ public DoubleValue copy(DoubleValue from) {
+ return copy(from, new DoubleValue());
+ }
+
+ @Override
public DoubleValue copy(DoubleValue from, DoubleValue reuse) {
reuse.setValue(from.getValue());
return reuse;
@@ -65,6 +69,11 @@ public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
}
@Override
+ public DoubleValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new DoubleValue(), source);
+ }
+
+ @Override
public DoubleValue deserialize(DoubleValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
index c0c7917..b1a46b0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-
-public class FloatSerializer extends TypeSerializer<Float> {
+public final class FloatSerializer extends TypeSerializerSingleton<Float> {
private static final long serialVersionUID = 1L;
@@ -50,6 +48,11 @@ public class FloatSerializer extends TypeSerializer<Float> {
}
@Override
+ public Float copy(Float from) {
+ return from;
+ }
+
+ @Override
public Float copy(Float from, Float reuse) {
return from;
}
@@ -65,9 +68,14 @@ public class FloatSerializer extends TypeSerializer<Float> {
}
@Override
- public Float deserialize(Float reuse, DataInputView source) throws IOException {
+ public Float deserialize(DataInputView source) throws IOException {
return Float.valueOf(source.readFloat());
}
+
+ @Override
+ public Float deserialize(Float reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
index 5660dea..6ebb268 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.FloatValue;
-public class FloatValueSerializer extends TypeSerializer<FloatValue> {
+public class FloatValueSerializer extends TypeSerializerSingleton<FloatValue> {
private static final long serialVersionUID = 1L;
@@ -49,6 +48,11 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
}
@Override
+ public FloatValue copy(FloatValue from) {
+ return copy(from, new FloatValue());
+ }
+
+ @Override
public FloatValue copy(FloatValue from, FloatValue reuse) {
reuse.setValue(from.getValue());
return reuse;
@@ -65,6 +69,11 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
}
@Override
+ public FloatValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new FloatValue(), source);
+ }
+
+ @Override
public FloatValue deserialize(FloatValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
@@ -72,6 +81,6 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
- target.writeDouble(source.readFloat());
+ target.writeFloat(source.readFloat());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index cfdb132..504b41b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -27,9 +27,11 @@ import org.apache.flink.core.memory.DataOutputView;
/**
+ * A serializer for arrays of objects.
+ *
* @param <C> The component type
*/
-public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
+public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
private static final long serialVersionUID = 1L;
@@ -40,7 +42,6 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
private final C[] EMPTY;
-
public GenericArraySerializer(Class<C> componentClass, TypeSerializer<C> componentSerializer) {
if (componentClass == null || componentSerializer == null) {
throw new NullPointerException();
@@ -68,7 +69,7 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
}
@Override
- public C[] copy(C[] from, C[] reuse) {
+ public C[] copy(C[] from) {
C[] copy = create(from.length);
for (int i = 0; i < copy.length; i++) {
@@ -77,6 +78,11 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
return copy;
}
+
+ @Override
+ public C[] copy(C[] from, C[] reuse) {
+ return copy(from);
+ }
@Override
public int getLength() {
@@ -98,6 +104,24 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
}
@Override
+ public C[] deserialize(DataInputView source) throws IOException {
+ int len = source.readInt();
+
+ C[] array = create(len);
+
+ for (int i = 0; i < len; i++) {
+ boolean isNonNull = source.readBoolean();
+ if (isNonNull) {
+ array[i] = componentSerializer.deserialize(source);
+ } else {
+ array[i] = null;
+ }
+ }
+
+ return array;
+ }
+
+ @Override
public C[] deserialize(C[] reuse, DataInputView source) throws IOException {
int len = source.readInt();
@@ -108,7 +132,13 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
for (int i = 0; i < len; i++) {
boolean isNonNull = source.readBoolean();
if (isNonNull) {
- reuse[i] = componentSerializer.deserialize(componentSerializer.createInstance(), source);
+ C ri = reuse[i];
+ if (ri == null) {
+ ri = componentSerializer.deserialize(source);
+ } else {
+ ri = componentSerializer.deserialize(ri, source);
+ }
+ reuse[i] = ri;
} else {
reuse[i] = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
index 28192cd..2937b2a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-public class IntSerializer extends TypeSerializer<Integer> {
+public final class IntSerializer extends TypeSerializerSingleton<Integer> {
private static final long serialVersionUID = 1L;
@@ -50,6 +49,11 @@ public class IntSerializer extends TypeSerializer<Integer> {
}
@Override
+ public Integer copy(Integer from) {
+ return from;
+ }
+
+ @Override
public Integer copy(Integer from, Integer reuse) {
return from;
}
@@ -65,9 +69,14 @@ public class IntSerializer extends TypeSerializer<Integer> {
}
@Override
- public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
+ public Integer deserialize(DataInputView source) throws IOException {
return Integer.valueOf(source.readInt());
}
+
+ @Override
+ public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
index 9cf72d9..ec1f345 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.IntValue;
-public class IntValueSerializer extends TypeSerializer<IntValue> {
+public final class IntValueSerializer extends TypeSerializerSingleton<IntValue> {
private static final long serialVersionUID = 1L;
@@ -49,6 +48,11 @@ public class IntValueSerializer extends TypeSerializer<IntValue> {
}
@Override
+ public IntValue copy(IntValue from) {
+ return copy(from, new IntValue());
+ }
+
+ @Override
public IntValue copy(IntValue from, IntValue reuse) {
reuse.setValue(from.getValue());
return reuse;
@@ -65,6 +69,11 @@ public class IntValueSerializer extends TypeSerializer<IntValue> {
}
@Override
+ public IntValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new IntValue(), source);
+ }
+
+ @Override
public IntValue deserialize(IntValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
index 2ca2b84..6b25596 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-public class LongSerializer extends TypeSerializer<Long> {
+public final class LongSerializer extends TypeSerializerSingleton<Long> {
private static final long serialVersionUID = 1L;
@@ -50,6 +49,11 @@ public class LongSerializer extends TypeSerializer<Long> {
}
@Override
+ public Long copy(Long from) {
+ return from;
+ }
+
+ @Override
public Long copy(Long from, Long reuse) {
return from;
}
@@ -65,9 +69,14 @@ public class LongSerializer extends TypeSerializer<Long> {
}
@Override
- public Long deserialize(Long reuse, DataInputView source) throws IOException {
+ public Long deserialize(DataInputView source) throws IOException {
return Long.valueOf(source.readLong());
}
+
+ @Override
+ public Long deserialize(Long reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
index ea45dcc..95caf04 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.LongValue;
-public class LongValueSerializer extends TypeSerializer<LongValue> {
+public final class LongValueSerializer extends TypeSerializerSingleton<LongValue> {
private static final long serialVersionUID = 1L;
@@ -49,6 +48,11 @@ public class LongValueSerializer extends TypeSerializer<LongValue> {
}
@Override
+ public LongValue copy(LongValue from) {
+ return copy(from, new LongValue());
+ }
+
+ @Override
public LongValue copy(LongValue from, LongValue reuse) {
reuse.setValue(from.getValue());
return reuse;
@@ -65,6 +69,11 @@ public class LongValueSerializer extends TypeSerializer<LongValue> {
}
@Override
+ public LongValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new LongValue(), source);
+ }
+
+ @Override
public LongValue deserialize(LongValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
index 156732c..c6e7870 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-public class ShortSerializer extends TypeSerializer<Short> {
+public final class ShortSerializer extends TypeSerializerSingleton<Short> {
private static final long serialVersionUID = 1L;
@@ -50,6 +49,11 @@ public class ShortSerializer extends TypeSerializer<Short> {
}
@Override
+ public Short copy(Short from) {
+ return from;
+ }
+
+ @Override
public Short copy(Short from, Short reuse) {
return from;
}
@@ -65,9 +69,14 @@ public class ShortSerializer extends TypeSerializer<Short> {
}
@Override
- public Short deserialize(Short reuse, DataInputView source) throws IOException {
+ public Short deserialize(DataInputView source) throws IOException {
return Short.valueOf(source.readShort());
}
+
+ @Override
+ public Short deserialize(Short reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
index ac46b46..ab58987 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.ShortValue;
-public class ShortValueSerializer extends TypeSerializer<ShortValue> {
+public final class ShortValueSerializer extends TypeSerializerSingleton<ShortValue> {
private static final long serialVersionUID = 1L;
@@ -49,6 +48,11 @@ public class ShortValueSerializer extends TypeSerializer<ShortValue> {
}
@Override
+ public ShortValue copy(ShortValue from) {
+ return copy(from, new ShortValue());
+ }
+
+ @Override
public ShortValue copy(ShortValue from, ShortValue reuse) {
reuse.setValue(from.getValue());
return reuse;
@@ -65,6 +69,11 @@ public class ShortValueSerializer extends TypeSerializer<ShortValue> {
}
@Override
+ public ShortValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new ShortValue(), source);
+ }
+
+ @Override
public ShortValue deserialize(ShortValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
index d71b521..71221a2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.StringValue;
-public class StringSerializer extends TypeSerializer<String> {
+public final class StringSerializer extends TypeSerializerSingleton<String> {
private static final long serialVersionUID = 1L;
@@ -50,6 +49,11 @@ public class StringSerializer extends TypeSerializer<String> {
}
@Override
+ public String copy(String from) {
+ return from;
+ }
+
+ @Override
public String copy(String from, String reuse) {
return from;
}
@@ -65,9 +69,14 @@ public class StringSerializer extends TypeSerializer<String> {
}
@Override
- public String deserialize(String record, DataInputView source) throws IOException {
+ public String deserialize(DataInputView source) throws IOException {
return StringValue.readString(source);
}
+
+ @Override
+ public String deserialize(String record, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
index 352330f..c5d5b55 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
@@ -20,16 +20,17 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.StringValue;
-public class StringValueSerializer extends TypeSerializer<StringValue> {
+public final class StringValueSerializer extends TypeSerializerSingleton<StringValue> {
private static final long serialVersionUID = 1L;
+ private static final int HIGH_BIT = 0x1 << 7;
+
public static final StringValueSerializer INSTANCE = new StringValueSerializer();
@@ -49,6 +50,11 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
}
@Override
+ public StringValue copy(StringValue from) {
+ return copy(from, new StringValue());
+ }
+
+ @Override
public StringValue copy(StringValue from, StringValue reuse) {
reuse.setValue(from);
return reuse;
@@ -65,6 +71,11 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
}
@Override
+ public StringValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new StringValue(), source);
+ }
+
+ @Override
public StringValue deserialize(StringValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
@@ -72,6 +83,29 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
- StringValue.copyString(source, target);
+ int len = source.readUnsignedByte();
+ target.writeByte(len);
+
+ if (len >= HIGH_BIT) {
+ int shift = 7;
+ int curr;
+ len = len & 0x7f;
+ while ((curr = source.readUnsignedByte()) >= HIGH_BIT) {
+ target.writeByte(curr);
+ len |= (curr & 0x7f) << shift;
+ shift += 7;
+ }
+ target.writeByte(curr);
+ len |= curr << shift;
+ }
+
+ for (int i = 0; i < len; i++) {
+ int c = source.readUnsignedByte();
+ target.writeByte(c);
+ while (c >= HIGH_BIT) {
+ c = source.readUnsignedByte();
+ target.writeByte(c);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
new file mode 100644
index 0000000..979d5ab
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
+
+ private static final long serialVersionUID = 8766687317209282373L;
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj != null && obj.getClass() == this.getClass();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
index 0093463..e9941a8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * A serializer for long arrays.
+ * A serializer for boolean arrays.
*/
-public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
+public final class BooleanPrimitiveArraySerializer extends TypeSerializerSingleton<boolean[]>{
private static final long serialVersionUID = 1L;
@@ -52,13 +52,18 @@ public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
}
@Override
- public boolean[] copy(boolean[] from, boolean[] reuse) {
+ public boolean[] copy(boolean[] from) {
boolean[] copy = new boolean[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
@Override
+ public boolean[] copy(boolean[] from, boolean[] reuse) {
+ return copy(from);
+ }
+
+ @Override
public int getLength() {
return -1;
}
@@ -79,15 +84,20 @@ public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
@Override
- public boolean[] deserialize(boolean[] reuse, DataInputView source) throws IOException {
+ public boolean[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new boolean[len];
+ boolean[] result = new boolean[len];
for (int i = 0; i < len; i++) {
- reuse[i] = source.readBoolean();
+ result[i] = source.readBoolean();
}
- return reuse;
+ return result;
+ }
+
+ @Override
+ public boolean[] deserialize(boolean[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
index fa0638a..aaf867f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * A serializer for long arrays.
+ * A serializer for byte arrays.
*/
-public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
+public final class BytePrimitiveArraySerializer extends TypeSerializerSingleton<byte[]>{
private static final long serialVersionUID = 1L;
@@ -51,11 +51,16 @@ public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
}
@Override
- public byte[] copy(byte[] from, byte[] reuse) {
+ public byte[] copy(byte[] from) {
byte[] copy = new byte[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
+
+ @Override
+ public byte[] copy(byte[] from, byte[] reuse) {
+ return copy(from);
+ }
@Override
public int getLength() {
@@ -74,13 +79,17 @@ public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
target.write(record);
}
-
@Override
- public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
+ public byte[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new byte[len];
- source.readFully(reuse);
- return reuse;
+ byte[] result = new byte[len];
+ source.readFully(result);
+ return result;
+ }
+
+ @Override
+ public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
index 639d4b6..64632bd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * A serializer for long arrays.
+ * A serializer for char arrays.
*/
-public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
+public final class CharPrimitiveArraySerializer extends TypeSerializerSingleton<char[]>{
private static final long serialVersionUID = 1L;
@@ -52,11 +52,16 @@ public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
}
@Override
- public char[] copy(char[] from, char[] reuse) {
+ public char[] copy(char[] from) {
char[] copy = new char[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
+
+ @Override
+ public char[] copy(char[] from, char[] reuse) {
+ return copy(from);
+ }
@Override
public int getLength() {
@@ -77,17 +82,21 @@ public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
}
}
-
@Override
- public char[] deserialize(char[] reuse, DataInputView source) throws IOException {
+ public char[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new char[len];
+ char[] result = new char[len];
for (int i = 0; i < len; i++) {
- reuse[i] = source.readChar();
+ result[i] = source.readChar();
}
- return reuse;
+ return result;
+ }
+
+ @Override
+ public char[] deserialize(char[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
index 2b089a3..846ae74 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * A serializer for long arrays.
+ * A serializer for double arrays.
*/
-public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
+public final class DoublePrimitiveArraySerializer extends TypeSerializerSingleton<double[]>{
private static final long serialVersionUID = 1L;
@@ -50,15 +50,20 @@ public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
public double[] createInstance() {
return EMPTY;
}
-
+
@Override
- public double[] copy(double[] from, double[] reuse) {
+ public double[] copy(double[] from) {
double[] copy = new double[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
@Override
+ public double[] copy(double[] from, double[] reuse) {
+ return copy(from);
+ }
+
+ @Override
public int getLength() {
return -1;
}
@@ -77,17 +82,21 @@ public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
}
}
-
@Override
- public double[] deserialize(double[] reuse, DataInputView source) throws IOException {
+ public double[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new double[len];
+ double[] result = new double[len];
for (int i = 0; i < len; i++) {
- reuse[i] = source.readDouble();
+ result[i] = source.readDouble();
}
- return reuse;
+ return result;
+ }
+
+ @Override
+ public double[] deserialize(double[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
index 897292e..8f42ac8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * A serializer for long arrays.
+ * A serializer for float arrays.
*/
-public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
+public final class FloatPrimitiveArraySerializer extends TypeSerializerSingleton<float[]>{
private static final long serialVersionUID = 1L;
@@ -52,11 +52,16 @@ public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
}
@Override
- public float[] copy(float[] from, float[] reuse) {
+ public float[] copy(float[] from) {
float[] copy = new float[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
+
+ @Override
+ public float[] copy(float[] from, float[] reuse) {
+ return copy(from);
+ }
@Override
public int getLength() {
@@ -77,17 +82,21 @@ public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
}
}
-
@Override
- public float[] deserialize(float[] reuse, DataInputView source) throws IOException {
+ public float[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new float[len];
+ float[] result = new float[len];
for (int i = 0; i < len; i++) {
- reuse[i] = source.readFloat();
+ result[i] = source.readFloat();
}
- return reuse;
+ return result;
+ }
+
+ @Override
+ public float[] deserialize(float[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
index aeaf35e..2ab056c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * A serializer for long arrays.
+ * A serializer for int arrays.
*/
-public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
+public class IntPrimitiveArraySerializer extends TypeSerializerSingleton<int[]>{
private static final long serialVersionUID = 1L;
@@ -52,11 +52,16 @@ public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
}
@Override
- public int[] copy(int[] from, int[] reuse) {
+ public int[] copy(int[] from) {
int[] copy = new int[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
+
+ @Override
+ public int[] copy(int[] from, int[] reuse) {
+ return copy(from);
+ }
@Override
public int getLength() {
@@ -77,17 +82,21 @@ public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
}
}
-
@Override
- public int[] deserialize(int[] reuse, DataInputView source) throws IOException {
+ public int[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new int[len];
+ int[] result = new int[len];
for (int i = 0; i < len; i++) {
- reuse[i] = source.readInt();
+ result[i] = source.readInt();
}
- return reuse;
+ return result;
+ }
+
+ @Override
+ public int[] deserialize(int[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
index 953e8ff..5d34dfe 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
* A serializer for long arrays.
*/
-public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
+public final class LongPrimitiveArraySerializer extends TypeSerializerSingleton<long[]>{
private static final long serialVersionUID = 1L;
@@ -52,10 +52,15 @@ public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
}
@Override
+ public long[] copy(long[] from) {
+ long[] result = new long[from.length];
+ System.arraycopy(from, 0, result, 0, from.length);
+ return result;
+ }
+
+ @Override
public long[] copy(long[] from, long[] reuse) {
- reuse = new long[from.length];
- System.arraycopy(from, 0, reuse, 0, from.length);
- return reuse;
+ return copy(from);
}
@Override
@@ -77,17 +82,21 @@ public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
}
}
-
@Override
- public long[] deserialize(long[] reuse, DataInputView source) throws IOException {
+ public long[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new long[len];
+ long[] array = new long[len];
for (int i = 0; i < len; i++) {
- reuse[i] = source.readLong();
+ array[i] = source.readLong();
}
- return reuse;
+ return array;
+ }
+
+ @Override
+ public long[] deserialize(long[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
index 014fd05..2f37033 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * A serializer for long arrays.
+ * A serializer for short arrays.
*/
-public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
+public final class ShortPrimitiveArraySerializer extends TypeSerializerSingleton<short[]>{
private static final long serialVersionUID = 1L;
@@ -52,11 +52,16 @@ public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
}
@Override
- public short[] copy(short[] from, short[] reuse) {
+ public short[] copy(short[] from) {
short[] copy = new short[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
+
+ @Override
+ public short[] copy(short[] from, short[] reuse) {
+ return copy(from);
+ }
@Override
public int getLength() {
@@ -77,17 +82,21 @@ public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
}
}
-
@Override
- public short[] deserialize(short[] reuse, DataInputView source) throws IOException {
+ public short[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new short[len];
+ short[] array = new short[len];
for (int i = 0; i < len; i++) {
- reuse[i] = source.readShort();
+ array[i] = source.readShort();
}
- return reuse;
+ return array;
+ }
+
+ @Override
+ public short[] deserialize(short[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
index c6c1826..d5ab030 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.StringValue;
@@ -29,7 +29,7 @@ import org.apache.flink.types.StringValue;
/**
* A serializer for String arrays. Specialized for efficiency.
*/
-public class StringArraySerializer extends TypeSerializer<String[]>{
+public final class StringArraySerializer extends TypeSerializerSingleton<String[]>{
private static final long serialVersionUID = 1L;
@@ -54,10 +54,15 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
}
@Override
+ public String[] copy(String[] from) {
+ String[] target = new String[from.length];
+ System.arraycopy(from, 0, target, 0, from.length);
+ return target;
+ }
+
+ @Override
public String[] copy(String[] from, String[] reuse) {
- reuse = new String[from.length];
- System.arraycopy(from, 0, reuse, 0, from.length);
- return reuse;
+ return copy(from);
}
@Override
@@ -65,7 +70,6 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
return -1;
}
-
@Override
public void serialize(String[] record, DataOutputView target) throws IOException {
if (record == null) {
@@ -79,17 +83,21 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
}
}
-
@Override
- public String[] deserialize(String[] reuse, DataInputView source) throws IOException {
+ public String[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new String[len];
+ String[] array = new String[len];
for (int i = 0; i < len; i++) {
- reuse[i] = StringValue.readString(source);
+ array[i] = StringValue.readString(source);
}
- return reuse;
+ return array;
+ }
+
+ @Override
+ public String[] deserialize(String[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
index 7e8762e..7b72e89 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
@@ -67,12 +67,16 @@ public final class RecordSerializer extends TypeSerializer<Record> {
}
@Override
+ public Record copy(Record from) {
+ return from.createCopy();
+ }
+
+ @Override
public Record copy(Record from, Record reuse) {
from.copyTo(reuse);
return reuse;
}
-
@Override
public int getLength() {
return -1;
@@ -86,6 +90,11 @@ public final class RecordSerializer extends TypeSerializer<Record> {
}
@Override
+ public Record deserialize(DataInputView source) throws IOException {
+ return deserialize(new Record(), source);
+ }
+
+ @Override
public Record deserialize(Record target, DataInputView source) throws IOException {
target.deserialize(source);
return target;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 6ca3f6c..d509284 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -89,6 +89,24 @@ public abstract class SerializerTestBase<T> {
}
@Test
+ public void testCopy() {
+ try {
+ TypeSerializer<T> serializer = getSerializer();
+ T[] testData = getData();
+
+ for (T datum : testData) {
+ T copy = serializer.copy(datum);
+ deepEquals("Copied element is not equal to the original element.", datum, copy);
+ }
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Exception in test: " + e.getMessage());
+ }
+ }
+
+ @Test
public void testCopyIntoNewElements() {
try {
TypeSerializer<T> serializer = getSerializer();
@@ -184,7 +202,36 @@ public abstract class SerializerTestBase<T> {
}
@Test
- public void testSerializeAsSequence() {
+ public void testSerializeAsSequenceNoReuse() {
+ try {
+ TypeSerializer<T> serializer = getSerializer();
+ T[] testData = getData();
+
+ TestOutputView out = new TestOutputView();
+ for (T value : testData) {
+ serializer.serialize(value, out);
+ }
+
+ TestInputView in = out.getInputView();
+
+ int num = 0;
+ while (in.available() > 0) {
+ T deserialized = serializer.deserialize(in);
+ deepEquals("Deserialized value if wrong.", testData[num], deserialized);
+ num++;
+ }
+
+ assertEquals("Wrong number of elements deserialized.", testData.length, num);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Exception in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSerializeAsSequenceReusingValues() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
index 2eb52c0..5b63633 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
@@ -69,11 +69,13 @@ public class
public void testAll() {
testInstantiate();
testGetLength();
+ testCopy();
testCopyIntoNewElements();
testCopyIntoReusedElements();
testSerializeIndividually();
testSerializeIndividuallyReusingValues();
- testSerializeAsSequence();
+ testSerializeAsSequenceNoReuse();
+ testSerializeAsSequenceReusingValues();
testSerializedCopyIndividually();
testSerializedCopyAsSequence();
testSerializabilityAndEquals();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java
new file mode 100644
index 0000000..43f1a57
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.BooleanValue;
+
+/**
+ * A test for the {@link BooleanValueSerializer}.
+ */
+public class BooleanValueSerializerTest extends SerializerTestBase<BooleanValue> {
+
+ @Override
+ protected TypeSerializer<BooleanValue> createSerializer() {
+ return new BooleanValueSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 1;
+ }
+
+ @Override
+ protected Class<BooleanValue> getTypeClass() {
+ return BooleanValue.class;
+ }
+
+ @Override
+ protected BooleanValue[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+
+ return new BooleanValue[] {new BooleanValue(true), new BooleanValue(false),
+ new BooleanValue(rnd.nextBoolean()),
+ new BooleanValue(rnd.nextBoolean()),
+ new BooleanValue(rnd.nextBoolean())};
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java
new file mode 100644
index 0000000..0e16629
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ByteValue;
+
+/**
+ * A test for the {@link ByteValueSerializer}.
+ */
+public class ByteValueSerializerTest extends SerializerTestBase<ByteValue> {
+
+ @Override
+ protected TypeSerializer<ByteValue> createSerializer() {
+ return new ByteValueSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 1;
+ }
+
+ @Override
+ protected Class<ByteValue> getTypeClass() {
+ return ByteValue.class;
+ }
+
+ @Override
+ protected ByteValue[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+ byte byteArray[] = new byte[1];
+ rnd.nextBytes(byteArray);
+
+ return new ByteValue[] {new ByteValue((byte) 0), new ByteValue((byte) 1), new ByteValue((byte) -1),
+ new ByteValue(Byte.MAX_VALUE), new ByteValue(Byte.MIN_VALUE),
+ new ByteValue(byteArray[0]), new ByteValue((byte) -byteArray[0])};
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java
new file mode 100644
index 0000000..ac83666
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.CharValue;
+
+/**
+ * A test for the {@link CharValueSerializer}.
+ */
+public class CharValueSerializerTest extends SerializerTestBase<CharValue> {
+
+ @Override
+ protected TypeSerializer<CharValue> createSerializer() {
+ return new CharValueSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 2;
+ }
+
+ @Override
+ protected Class<CharValue> getTypeClass() {
+ return CharValue.class;
+ }
+
+ @Override
+ protected CharValue[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+ int rndInt = rnd.nextInt((int) Character.MAX_VALUE);
+
+ return new CharValue[] {new CharValue('a'), new CharValue('@'), new CharValue('รค'),
+ new CharValue('1'), new CharValue((char) rndInt),
+ new CharValue(Character.MAX_VALUE), new CharValue(Character.MIN_VALUE)};
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java
new file mode 100644
index 0000000..c0a2b24
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.DoubleValue;
+
+/**
+ * A test for the {@link DoubleValueSerializer}.
+ */
+public class DoubleValueSerializerTest extends SerializerTestBase<DoubleValue> {
+
+ @Override
+ protected TypeSerializer<DoubleValue> createSerializer() {
+ return new DoubleValueSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 8;
+ }
+
+ @Override
+ protected Class<DoubleValue> getTypeClass() {
+ return DoubleValue.class;
+ }
+
+ @Override
+ protected DoubleValue[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+ Double rndDouble = rnd.nextDouble() * Double.MAX_VALUE;
+
+ return new DoubleValue[] {new DoubleValue(0), new DoubleValue(1), new DoubleValue(-1),
+ new DoubleValue(Double.MAX_VALUE), new DoubleValue(Double.MIN_VALUE),
+ new DoubleValue(rndDouble), new DoubleValue(-rndDouble),
+ new DoubleValue(Double.NaN),
+ new DoubleValue(Double.NEGATIVE_INFINITY), new DoubleValue(Double.POSITIVE_INFINITY)};
+ }
+}