You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/30 16:57:14 UTC
[1/2] [FLINK-1005] Extend TypeSerializer interface to handle
non-mutable object deserialization more efficiently.
Repository: incubator-flink
Updated Branches:
refs/heads/master ea4c8828c -> 76d4a75e8
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializerTest.java
new file mode 100644
index 0000000..13aba1f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.FloatValue;
+
+/**
+ * A test for the {@link FloatValueSerializer}.
+ */
+public class FloatValueSerializerTest extends SerializerTestBase<FloatValue> {
+
+ @Override
+ protected TypeSerializer<FloatValue> createSerializer() {
+ return new FloatValueSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 4;
+ }
+
+ @Override
+ protected Class<FloatValue> getTypeClass() {
+ return FloatValue.class;
+ }
+
+ @Override
+ protected FloatValue[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+ float rndFloat = rnd.nextFloat() * Float.MAX_VALUE;
+
+ return new FloatValue[] {new FloatValue(0), new FloatValue(1), new FloatValue(-1),
+ new FloatValue(Float.MAX_VALUE), new FloatValue(Float.MIN_VALUE),
+ new FloatValue(rndFloat), new FloatValue(-rndFloat),
+ new FloatValue(Float.NaN),
+ new FloatValue(Float.NEGATIVE_INFINITY), new FloatValue(Float.POSITIVE_INFINITY)};
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueSerializerTest.java
new file mode 100644
index 0000000..80d63b0
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.IntValue;
+
+/**
+ * A test for the {@link IntValueSerializer}.
+ */
+public class IntValueSerializerTest extends SerializerTestBase<IntValue> {
+
+ @Override
+ protected TypeSerializer<IntValue> createSerializer() {
+ return new IntValueSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 4;
+ }
+
+ @Override
+ protected Class<IntValue> getTypeClass() {
+ return IntValue.class;
+ }
+
+ @Override
+ protected IntValue[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+ int rndInt = rnd.nextInt();
+
+ return new IntValue[] {new IntValue(0), new IntValue(1), new IntValue(-1),
+ new IntValue(Integer.MAX_VALUE), new IntValue(Integer.MIN_VALUE),
+ new IntValue(rndInt), new IntValue(-rndInt)};
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueSerializerTest.java
new file mode 100644
index 0000000..533b809
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.LongValue;
+
+/**
+ * A test for the {@link LongValueSerializer}.
+ */
+public class LongValueSerializerTest extends SerializerTestBase<LongValue> {
+
+ @Override
+ protected TypeSerializer<LongValue> createSerializer() {
+ return new LongValueSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 8;
+ }
+
+ @Override
+ protected Class<LongValue> getTypeClass() {
+ return LongValue.class;
+ }
+
+ @Override
+ protected LongValue[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+ long rndLong = rnd.nextLong();
+
+ return new LongValue[] {new LongValue(0L), new LongValue(1L), new LongValue(-1L),
+ new LongValue(Long.MAX_VALUE), new LongValue(Long.MIN_VALUE),
+ new LongValue(rndLong), new LongValue(-rndLong)};
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializerTest.java
new file mode 100644
index 0000000..4ef6816
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ShortValue;
+
+/**
+ * A test for the {@link ShortValueSerializer}.
+ */
+public class ShortValueSerializerTest extends SerializerTestBase<ShortValue> {
+
+ @Override
+ protected TypeSerializer<ShortValue> createSerializer() {
+ return new ShortValueSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 2;
+ }
+
+ @Override
+ protected Class<ShortValue> getTypeClass() {
+ return ShortValue.class;
+ }
+
+ @Override
+ protected ShortValue[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+ int rndInt = rnd.nextInt(32767);
+
+ return new ShortValue[] {new ShortValue((short) 0), new ShortValue((short) 1), new ShortValue((short) -1),
+ new ShortValue((short) rndInt), new ShortValue((short) -rndInt),
+ new ShortValue((short) -32767), new ShortValue((short) 32768)};
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java
new file mode 100644
index 0000000..c20cd92
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.StringValue;
+
+/**
+ * A test for the {@link StringValueSerializer}.
+ */
+public class StringValueSerializerTest extends SerializerTestBase<StringValue> {
+
+ @Override
+ protected TypeSerializer<StringValue> createSerializer() {
+ return new StringValueSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @Override
+ protected Class<StringValue> getTypeClass() {
+ return StringValue.class;
+ }
+
+ @Override
+ protected StringValue[] getTestData() {
+ return new StringValue[] {
+ new StringValue("a"),
+ new StringValue(""),
+ new StringValue("bcd"),
+ new StringValue("jbmbmner8 jhk hj \n \t üäßß@µ"),
+ new StringValue(""),
+ new StringValue("non-empty")};
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/logback-test.xml b/flink-core/src/test/resources/logback-test.xml
index 8b3bb27..4f484cb 100644
--- a/flink-core/src/test/resources/logback-test.xml
+++ b/flink-core/src/test/resources/logback-test.xml
@@ -26,4 +26,8 @@
<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
+
+ <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+ <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+ <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index d93030d..31a04c9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -36,7 +36,7 @@ import com.esotericsoftware.kryo.Kryo;
*
* @param <T> The type serialized.
*/
-public class AvroSerializer<T> extends TypeSerializer<T> {
+public final class AvroSerializer<T> extends TypeSerializer<T> {
private static final long serialVersionUID = 1L;
@@ -89,10 +89,15 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
}
@Override
+ public T copy(T from) {
+ checkKryoInitialized();
+ return this.kryo.copy(from);
+ }
+
+ @Override
public T copy(T from, T reuse) {
checkKryoInitialized();
- reuse = this.kryo.copy(from);
- return reuse;
+ return this.kryo.copy(from);
}
@Override
@@ -106,6 +111,13 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
this.encoder.setOut(target);
this.writer.write(value, this.encoder);
}
+
+ @Override
+ public T deserialize(DataInputView source) throws IOException {
+ checkAvroInitialized();
+ this.decoder.setIn(source);
+ return this.reader.read(null, this.decoder);
+ }
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
@@ -146,4 +158,21 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
this.kryo.register(type);
}
}
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return 0x42fba55c + this.type.hashCode() + this.typeToInstantiate.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == AvroSerializer.class) {
+ AvroSerializer<?> other = (AvroSerializer<?>) obj;
+ return this.type == other.type && this.typeToInstantiate == other.typeToInstantiate;
+ } else {
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index fa3a91e..9d12d7e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -55,7 +55,12 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
public T createInstance() {
return InstantiationUtil.instantiate(this.valueClass);
}
-
+
+ @Override
+ public T copy(T from) {
+ return copy(from, createInstance());
+ }
+
@Override
public T copy(T from, T reuse) {
from.copyTo(reuse);
@@ -74,6 +79,11 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
}
@Override
+ public T deserialize(DataInputView source) throws IOException {
+ return deserialize(createInstance(), source);
+ }
+
+ @Override
public T deserialize(T reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
@@ -92,4 +102,19 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
instance = createInstance();
}
}
+
+ @Override
+ public int hashCode() {
+ return this.valueClass.hashCode() + 9231;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == CopyableValueSerializer.class) {
+ CopyableValueSerializer<?> other = (CopyableValueSerializer<?>) obj;
+ return this.valueClass == other.valueClass;
+ } else {
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index 9e2a1f3..a3acb20 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.typeutils.runtime;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -75,10 +76,15 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
}
@Override
+ public T copy(T from) {
+ checkKryoInitialized();
+ return kryo.copy(from);
+ }
+
+ @Override
public T copy(T from, T reuse) {
checkKryoInitialized();
- reuse = kryo.copy(from);
- return reuse;
+ return kryo.copy(from);
}
@Override
@@ -100,15 +106,19 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
}
@Override
- public T deserialize(T reuse, DataInputView source) throws IOException {
+ public T deserialize(DataInputView source) throws IOException {
checkKryoInitialized();
if (source != previousIn) {
DataInputViewStream inputStream = new DataInputViewStream(source);
input = new NoFetchingInput(inputStream);
previousIn = source;
}
- reuse = kryo.readObject(input, typeToInstantiate);
- return reuse;
+ return kryo.readObject(input, typeToInstantiate);
+ }
+
+ @Override
+ public T deserialize(T reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
@@ -121,6 +131,25 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
T tmp = deserialize(copyInstance, source);
serialize(tmp, target);
}
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return type.hashCode() + 31 * typeToInstantiate.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj != null && obj instanceof KryoSerializer) {
+ KryoSerializer<?> other = (KryoSerializer<?>) obj;
+ return other.type == this.type && other.typeToInstantiate == this.typeToInstantiate;
+ } else {
+ return false;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
private final void checkKryoInitialized() {
if (this.kryo == null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index ed48c42..71f2cd8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -131,6 +131,28 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
}
@Override
+ public T copy(T from) {
+ T target;
+ try {
+ target = clazz.newInstance();
+ }
+ catch (Throwable t) {
+ throw new RuntimeException("Cannot instantiate class.", t);
+ }
+
+ try {
+ for (int i = 0; i < numFields; i++) {
+ Object copy = fieldSerializers[i].copy(fields[i].get(from));
+ fields[i].set(target, copy);
+ }
+ }
+ catch (IllegalAccessException e) {
+ throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.");
+ }
+ return target;
+ }
+
+ @Override
public T copy(T from, T reuse) {
try {
for (int i = 0; i < numFields; i++) {
@@ -165,6 +187,28 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
}
@Override
+ public T deserialize(DataInputView source) throws IOException {
+ T target;
+ try {
+ target = clazz.newInstance();
+ }
+ catch (Throwable t) {
+ throw new RuntimeException("Cannot instantiate class.", t);
+ }
+
+ try {
+ for (int i = 0; i < numFields; i++) {
+ Object field = fieldSerializers[i].deserialize(source);
+ fields[i].set(target, field);
+ }
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" +
+ "before.");
+ }
+ return target;
+ }
+
+ @Override
public T deserialize(T reuse, DataInputView source) throws IOException {
try {
for (int i = 0; i < numFields; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index 5d5c08f..12bec12 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -31,7 +31,6 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
private static final long serialVersionUID = 1L;
- @SuppressWarnings("unchecked")
public TupleSerializer(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
super(tupleClass, fieldSerializers);
}
@@ -69,6 +68,11 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
}
@Override
+ public T copy(T from) {
+ return copy(from, instantiateRaw());
+ }
+
+ @Override
public T copy(T from, T reuse) {
for (int i = 0; i < arity; i++) {
Object copy = fieldSerializers[i].copy(from.getField(i), reuse.getField(i));
@@ -91,6 +95,16 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
}
@Override
+ public T deserialize(DataInputView source) throws IOException {
+ T tuple = instantiateRaw();
+ for (int i = 0; i < arity; i++) {
+ Object field = fieldSerializers[i].deserialize(source);
+ tuple.setField(field, i);
+ }
+ return tuple;
+ }
+
+ @Override
public T deserialize(T reuse, DataInputView source) throws IOException {
for (int i = 0; i < arity; i++) {
Object field = fieldSerializers[i].deserialize(reuse.getField(i), source);
@@ -98,4 +112,13 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
}
return reuse;
}
+
+ private T instantiateRaw() {
+ try {
+ return tupleClass.newInstance();
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Cannot instantiate tuple.", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index db71b56..08df7d3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -28,6 +28,8 @@ import java.util.Arrays;
public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
+ private static final long serialVersionUID = 1L;
+
protected final Class<T> tupleClass;
protected final TypeSerializer<Object>[] fieldSerializers;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 8bea523..69a5ff6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -72,10 +72,15 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
}
@Override
+ public T copy(T from) {
+ checkKryoInitialized();
+ return this.kryo.copy(from);
+ }
+
+ @Override
public T copy(T from, T reuse) {
checkKryoInitialized();
- reuse = this.kryo.copy(from);
- return reuse;
+ return this.kryo.copy(from);
}
@Override
@@ -89,6 +94,11 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
}
@Override
+ public T deserialize(DataInputView source) throws IOException {
+ return deserialize(createInstance(), source);
+ }
+
+ @Override
public T deserialize(T reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
@@ -111,4 +121,21 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
this.kryo.register(type);
}
}
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return this.type.hashCode() + 17;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == ValueSerializer.class) {
+ ValueSerializer<?> other = (ValueSerializer<?>) obj;
+ return this.type == other.type;
+ } else {
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 6fcf730..d5a0470 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -48,10 +48,15 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
}
@Override
+ public T copy(T from) {
+ checkKryoInitialized();
+ return this.kryo.copy(from);
+ }
+
+ @Override
public T copy(T from, T reuse) {
checkKryoInitialized();
- reuse = this.kryo.copy(from);
- return reuse;
+ return this.kryo.copy(from);
}
@Override
@@ -65,6 +70,11 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
}
@Override
+ public T deserialize(DataInputView source) throws IOException {
+ return deserialize(createInstance(), source);
+ }
+
+ @Override
public T deserialize(T reuse, DataInputView source) throws IOException {
reuse.readFields(source);
return reuse;
@@ -102,4 +112,20 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
this.kryo.register(typeClass);
}
}
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return this.typeClass.hashCode() + 177;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == WritableSerializer.class) {
+ WritableSerializer<?> other = (WritableSerializer<?>) obj;
+ return this.typeClass == other.typeClass;
+ } else {
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
index f3b63d3..2bc11f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
@@ -16,24 +16,21 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.resettable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.NoSuchElementException;
-import org.junit.Assert;
-
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.types.IntValueSerializer;
import org.apache.flink.types.IntValue;
+import org.junit.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
index 05b55da..2134bcd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.operators.testutils.types;
import java.io.IOException;
+import java.util.Arrays;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
@@ -44,9 +45,14 @@ public class IntListSerializer extends TypeSerializer<IntList> {
}
@Override
+ public IntList copy(IntList from) {
+ return new IntList(from.getKey(), Arrays.copyOf(from.getValue(), from.getValue().length));
+ }
+
+ @Override
public IntList copy(IntList from, IntList reuse) {
reuse.setKey(from.getKey());
- reuse.setValue(from.getValue());
+ reuse.setValue(Arrays.copyOf(from.getValue(), from.getValue().length));
return reuse;
}
@@ -74,6 +80,11 @@ public class IntListSerializer extends TypeSerializer<IntList> {
}
@Override
+ public IntList deserialize(DataInputView source) throws IOException {
+ return deserialize(new IntList(), source);
+ }
+
+ @Override
public IntList deserialize(IntList record, DataInputView source) throws IOException {
int key = source.readInt();
record.setKey(key);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
index 9533627..361585d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
@@ -46,6 +46,11 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
public IntPair createInstance() {
return new IntPair();
}
+
+ @Override
+ public IntPair copy(IntPair from) {
+ return new IntPair(from.getKey(), from.getValue());
+ }
@Override
public IntPair copy(IntPair from, IntPair reuse) {
@@ -67,6 +72,11 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
}
@Override
+ public IntPair deserialize(DataInputView source) throws IOException {
+ return new IntPair(source.readInt(), source.readInt());
+ }
+
+ @Override
public IntPair deserialize(IntPair reuse, DataInputView source) throws IOException {
reuse.setKey(source.readInt());
reuse.setValue(source.readInt());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntValueSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntValueSerializer.java
deleted file mode 100644
index ab24e92..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntValueSerializer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.testutils.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.IntValue;
-
-
-public class IntValueSerializer extends TypeSerializer<IntValue> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public boolean isStateful() {
- return false;
- }
- @Override
- public IntValue createInstance() {
- return new IntValue();
- }
-
- @Override
- public IntValue copy(IntValue from, IntValue reuse) {
- reuse.setValue(from.getValue());
- return reuse;
- }
-
-
- @Override
- public int getLength() {
- return 4;
- }
-
- @Override
- public void serialize(IntValue record, DataOutputView target) throws IOException {
- target.writeInt(record.getValue());
- }
-
- @Override
- public IntValue deserialize(IntValue reuse, DataInputView source) throws IOException {
- reuse.setValue(source.readInt());
- return reuse;
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- target.write(source, 4);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
index 8ba46c2..a38633c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
@@ -45,6 +45,10 @@ public class StringPairSerializer extends TypeSerializer<StringPair> {
}
@Override
+ public StringPair copy(StringPair from) {
+ return new StringPair(from.getKey(), from.getValue());
+ }
+ @Override
public StringPair copy(StringPair from, StringPair reuse) {
reuse.setKey(from.getKey());
reuse.setValue(from.getValue());
@@ -63,6 +67,11 @@ public class StringPairSerializer extends TypeSerializer<StringPair> {
}
@Override
+ public StringPair deserialize(DataInputView source) throws IOException {
+ return new StringPair(StringValue.readString(source), StringValue.readString(source));
+ }
+
+ @Override
public StringPair deserialize(StringPair record, DataInputView source) throws IOException {
record.setKey(StringValue.readString(source));
record.setValue(StringValue.readString(source));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index b452b12..f9cd10c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -31,8 +31,11 @@ abstract class CaseClassSerializer[T <: Product](
scalaFieldSerializers: Array[TypeSerializer[_]])
extends TupleSerializerBase[T](clazz, scalaFieldSerializers) {
+ @transient var fields : Array[AnyRef] = _
+
+
def createInstance: T = {
- val fields: Array[AnyRef] = new Array(arity)
+ initArray()
for (i <- 0 until arity) {
fields(i) = fieldSerializers(i).createInstance()
}
@@ -40,7 +43,11 @@ abstract class CaseClassSerializer[T <: Product](
}
def copy(from: T, reuse: T): T = {
- val fields: Array[AnyRef] = new Array(arity)
+ copy(from)
+ }
+
+ def copy(from: T): T = {
+ initArray()
for (i <- 0 until arity) {
fields(i) = from.productElement(i).asInstanceOf[AnyRef]
}
@@ -55,11 +62,25 @@ abstract class CaseClassSerializer[T <: Product](
}
def deserialize(reuse: T, source: DataInputView): T = {
- val fields: Array[AnyRef] = new Array(arity)
+ initArray()
for (i <- 0 until arity) {
val field = reuse.productElement(i).asInstanceOf[AnyRef]
fields(i) = fieldSerializers(i).deserialize(field, source)
}
createInstance(fields)
}
+
+ def deserialize(source: DataInputView): T = {
+ initArray()
+ for (i <- 0 until arity) {
+ fields(i) = fieldSerializers(i).deserialize(source)
+ }
+ createInstance(fields)
+ }
+
+ def initArray() = {
+ if (fields == null) {
+ fields = new Array[AnyRef](arity)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
index fdfd873..822b4f2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
@@ -20,11 +20,11 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-public final class VertexWithAdjacencyListSerializer extends TypeSerializer<VertexWithAdjacencyList> {
+public final class VertexWithAdjacencyListSerializer extends TypeSerializerSingleton<VertexWithAdjacencyList> {
private static final long serialVersionUID = 1L;
@@ -45,6 +45,14 @@ public final class VertexWithAdjacencyListSerializer extends TypeSerializer<Vert
}
@Override
+ public VertexWithAdjacencyList copy(VertexWithAdjacencyList from) {
+ VertexWithAdjacencyList copy = new VertexWithAdjacencyList(from.getVertexID(), new long[from.getNumTargets()]);
+ copy.setNumTargets(from.getNumTargets());
+ System.arraycopy(from.getTargets(), 0, copy.getTargets(), 0, from.getNumTargets());
+ return copy;
+ }
+
+ @Override
public VertexWithAdjacencyList copy(VertexWithAdjacencyList from, VertexWithAdjacencyList reuse) {
if (reuse.getTargets().length < from.getTargets().length) {
reuse.setTargets(new long[from.getTargets().length]);
@@ -75,6 +83,11 @@ public final class VertexWithAdjacencyListSerializer extends TypeSerializer<Vert
}
@Override
+ public VertexWithAdjacencyList deserialize(DataInputView source) throws IOException {
+ return deserialize(new VertexWithAdjacencyList(), source);
+ }
+
+ @Override
public VertexWithAdjacencyList deserialize(VertexWithAdjacencyList target, DataInputView source) throws IOException {
target.setVertexID(source.readLong());
@@ -101,16 +114,4 @@ public final class VertexWithAdjacencyListSerializer extends TypeSerializer<Vert
target.writeInt(numTargets);
target.write(source, numTargets * 8);
}
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return 3;
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj != null && obj.getClass() == VertexWithAdjacencyListSerializer.class;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
index 5189203..e972cd1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
@@ -20,11 +20,11 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<VertexWithRankAndDangling> {
+public final class VertexWithRankAndDanglingSerializer extends TypeSerializerSingleton<VertexWithRankAndDangling> {
private static final long serialVersionUID = 1L;
@@ -45,6 +45,11 @@ public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<Ve
}
@Override
+ public VertexWithRankAndDangling copy(VertexWithRankAndDangling from) {
+ return new VertexWithRankAndDangling(from.getVertexID(), from.getRank(), from.isDangling());
+ }
+
+ @Override
public VertexWithRankAndDangling copy(VertexWithRankAndDangling from, VertexWithRankAndDangling reuse) {
reuse.setVertexID(from.getVertexID());
reuse.setRank(from.getRank());
@@ -65,6 +70,11 @@ public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<Ve
}
@Override
+ public VertexWithRankAndDangling deserialize(DataInputView source) throws IOException {
+ return new VertexWithRankAndDangling(source.readLong(), source.readDouble(), source.readBoolean());
+ }
+
+ @Override
public VertexWithRankAndDangling deserialize(VertexWithRankAndDangling target, DataInputView source) throws IOException {
target.setVertexID(source.readLong());
target.setRank(source.readDouble());
@@ -76,16 +86,4 @@ public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<Ve
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source, 17);
}
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return 2;
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj != null && obj.getClass() == VertexWithRankAndDanglingSerializer.class;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
index 1065633..928d4f4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
@@ -20,11 +20,11 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRank> {
+public final class VertexWithRankSerializer extends TypeSerializerSingleton<VertexWithRank> {
private static final long serialVersionUID = 1L;
@@ -43,6 +43,11 @@ public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRan
public VertexWithRank createInstance() {
return new VertexWithRank();
}
+
+ @Override
+ public VertexWithRank copy(VertexWithRank from) {
+ return new VertexWithRank(from.getVertexID(), from.getRank());
+ }
@Override
public VertexWithRank copy(VertexWithRank from, VertexWithRank reuse) {
@@ -63,6 +68,11 @@ public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRan
}
@Override
+ public VertexWithRank deserialize(DataInputView source) throws IOException {
+ return new VertexWithRank(source.readLong(), source.readDouble());
+ }
+
+ @Override
public VertexWithRank deserialize(VertexWithRank target, DataInputView source) throws IOException {
target.setVertexID(source.readLong());
target.setRank(source.readDouble());
@@ -73,16 +83,4 @@ public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRan
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source, 16);
}
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return 1;
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj != null && obj.getClass() == VertexWithRankSerializer.class;
- }
}
[2/2] git commit: [FLINK-1005] Extend TypeSerializer interface to
handle non-mutable object deserialization more efficiently.
Posted by se...@apache.org.
[FLINK-1005] Extend TypeSerializer interface to handle non-mutable object deserialization more efficiently.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/76d4a75e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/76d4a75e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/76d4a75e
Branch: refs/heads/master
Commit: 76d4a75e823c31a899f2143fb6be185b90e55532
Parents: ea4c882
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jul 7 19:39:24 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 15:10:38 2014 +0200
----------------------------------------------------------------------
.../streamrecord/StreamRecordSerializer.java | 22 +++++-
.../api/common/typeutils/TypeSerializer.java | 42 ++++++-----
.../typeutils/base/BooleanSerializer.java | 14 +++-
.../typeutils/base/BooleanValueSerializer.java | 15 +++-
.../common/typeutils/base/ByteSerializer.java | 15 +++-
.../typeutils/base/ByteValueSerializer.java | 13 +++-
.../common/typeutils/base/CharSerializer.java | 15 +++-
.../typeutils/base/CharValueSerializer.java | 14 +++-
.../common/typeutils/base/DoubleSerializer.java | 16 +++--
.../typeutils/base/DoubleValueSerializer.java | 13 +++-
.../common/typeutils/base/FloatSerializer.java | 16 +++--
.../typeutils/base/FloatValueSerializer.java | 15 +++-
.../typeutils/base/GenericArraySerializer.java | 38 ++++++++--
.../common/typeutils/base/IntSerializer.java | 15 +++-
.../typeutils/base/IntValueSerializer.java | 13 +++-
.../common/typeutils/base/LongSerializer.java | 15 +++-
.../typeutils/base/LongValueSerializer.java | 13 +++-
.../common/typeutils/base/ShortSerializer.java | 15 +++-
.../typeutils/base/ShortValueSerializer.java | 13 +++-
.../common/typeutils/base/StringSerializer.java | 15 +++-
.../typeutils/base/StringValueSerializer.java | 40 ++++++++++-
.../typeutils/base/TypeSerializerSingleton.java | 38 ++++++++++
.../array/BooleanPrimitiveArraySerializer.java | 26 ++++---
.../array/BytePrimitiveArraySerializer.java | 27 ++++---
.../array/CharPrimitiveArraySerializer.java | 27 ++++---
.../array/DoublePrimitiveArraySerializer.java | 29 +++++---
.../array/FloatPrimitiveArraySerializer.java | 27 ++++---
.../base/array/IntPrimitiveArraySerializer.java | 27 ++++---
.../array/LongPrimitiveArraySerializer.java | 29 +++++---
.../array/ShortPrimitiveArraySerializer.java | 27 ++++---
.../base/array/StringArraySerializer.java | 30 +++++---
.../typeutils/record/RecordSerializer.java | 11 ++-
.../common/typeutils/SerializerTestBase.java | 49 ++++++++++++-
.../typeutils/SerializerTestInstance.java | 4 +-
.../base/BooleanValueSerializerTest.java | 56 +++++++++++++++
.../typeutils/base/ByteValueSerializerTest.java | 57 +++++++++++++++
.../typeutils/base/CharValueSerializerTest.java | 56 +++++++++++++++
.../base/DoubleValueSerializerTest.java | 58 +++++++++++++++
.../base/FloatValueSerializerTest.java | 58 +++++++++++++++
.../typeutils/base/IntValueSerializerTest.java | 56 +++++++++++++++
.../typeutils/base/LongValueSerializerTest.java | 56 +++++++++++++++
.../base/ShortValueSerializerTest.java | 57 +++++++++++++++
.../base/StringValueSerializerTest.java | 55 ++++++++++++++
flink-core/src/test/resources/logback-test.xml | 4 ++
.../java/typeutils/runtime/AvroSerializer.java | 35 ++++++++-
.../runtime/CopyableValueSerializer.java | 27 ++++++-
.../java/typeutils/runtime/KryoSerializer.java | 39 ++++++++--
.../java/typeutils/runtime/PojoSerializer.java | 44 ++++++++++++
.../java/typeutils/runtime/TupleSerializer.java | 25 ++++++-
.../typeutils/runtime/TupleSerializerBase.java | 2 +
.../java/typeutils/runtime/ValueSerializer.java | 31 +++++++-
.../typeutils/runtime/WritableSerializer.java | 30 +++++++-
.../SpillingResettableIteratorTest.java | 7 +-
.../testutils/types/IntListSerializer.java | 13 +++-
.../testutils/types/IntPairSerializer.java | 10 +++
.../testutils/types/IntValueSerializer.java | 75 --------------------
.../testutils/types/StringPairSerializer.java | 9 +++
.../scala/typeutils/CaseClassSerializer.scala | 27 ++++++-
.../VertexWithAdjacencyListSerializer.java | 29 ++++----
.../VertexWithRankAndDanglingSerializer.java | 26 ++++---
.../types/VertexWithRankSerializer.java | 26 ++++---
61 files changed, 1405 insertions(+), 301 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
index 932bae0..85faa9e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
@@ -62,6 +62,15 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
throw new RuntimeException("Cannot instantiate StreamRecord.", e);
}
}
+
+ @Override
+ public StreamRecord<T> copy(StreamRecord<T> from) {
+ StreamRecord<T> rec = new StreamRecord<T>();
+ rec.isTuple = from.isTuple;
+ rec.setId(from.getId().copy());
+ rec.setObject(typeSerializer.copy(from.getObject()));
+ return rec;
+ }
@Override
public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
@@ -81,10 +90,18 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
value.getId().write(target);
typeSerializer.serialize(value.getObject(), target);
}
+
+ @Override
+ public StreamRecord<T> deserialize(DataInputView source) throws IOException {
+ StreamRecord<T> record = new StreamRecord<T>();
+ record.isTuple = this.isTuple;
+ record.getId().read(source);
+ record.setObject(typeSerializer.deserialize(source));
+ return record;
+ }
@Override
- public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source)
- throws IOException {
+ public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
reuse.getId().read(source);
reuse.setObject(typeSerializer.deserialize(reuse.getObject(), source));
return reuse;
@@ -94,5 +111,4 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
public void copy(DataInputView source, DataOutputView target) throws IOException {
// Needs to be implemented
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 9be4a89..87d7e20 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -73,10 +73,21 @@ public abstract class TypeSerializer<T> implements Serializable {
public abstract T createInstance();
/**
- * Creates a copy from the given element, storing the copied result in the given reuse element if type is mutable.
+ * Creates a deep copy of the given element in a new element.
*
* @param from The element reuse be copied.
- * @param reuse The element to be reused.
+ * @return A deep copy of the element.
+ */
+ public abstract T copy(T from);
+
+ /**
+ * Creates a copy from the given element.
+ * The method makes an attempt to store the copy in the given reuse element, if the type is mutable.
+ * This is, however, not guaranteed.
+ *
+ * @param from The element to be copied.
+ * @param reuse The element to be reused. May or may not be used.
+ * @return A deep copy of the element.
*/
public abstract T copy(T from, T reuse);
@@ -103,10 +114,22 @@ public abstract class TypeSerializer<T> implements Serializable {
public abstract void serialize(T record, DataOutputView target) throws IOException;
/**
+ * De-serializes a record from the given source input view.
+ *
+ * @param source The input view from which to read the data.
+ * @result The deserialized element.
+ *
+ * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
+ * input view, which may have an underlying I/O channel from which it reads.
+ */
+ public abstract T deserialize(DataInputView source) throws IOException;
+
+ /**
* De-serializes a record from the given source input view into the given reuse record instance if mutable.
*
* @param reuse The record instance into which to de-serialize the data.
* @param source The input view from which to read the data.
+ * @result The deserialized element.
*
* @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
* input view, which may have an underlying I/O channel from which it reads.
@@ -126,19 +149,4 @@ public abstract class TypeSerializer<T> implements Serializable {
* @throws IOException Thrown if any of the two views raises an exception.
*/
public abstract void copy(DataInputView source, DataOutputView target) throws IOException;
-
- // --------------------------------------------------------------------------------------------
- // Default Utilities: Hash code and equals are pre-defined for singleton serializers, where
- // all instances are equal
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return getClass().hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj != null && obj.getClass() == this.getClass();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
index 02a72c5..ecfb3c2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-
-public class BooleanSerializer extends TypeSerializer<Boolean> {
+public final class BooleanSerializer extends TypeSerializerSingleton<Boolean> {
private static final long serialVersionUID = 1L;
@@ -49,6 +47,11 @@ public class BooleanSerializer extends TypeSerializer<Boolean> {
}
@Override
+ public Boolean copy(Boolean from) {
+ return from;
+ }
+
+ @Override
public Boolean copy(Boolean from, Boolean reuse) {
return from;
}
@@ -64,6 +67,11 @@ public class BooleanSerializer extends TypeSerializer<Boolean> {
}
@Override
+ public Boolean deserialize(DataInputView source) throws IOException {
+ return Boolean.valueOf(source.readBoolean());
+ }
+
+ @Override
public Boolean deserialize(Boolean reuse, DataInputView source) throws IOException {
return Boolean.valueOf(source.readBoolean());
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
index ddab3bb..4795055 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.BooleanValue;
-public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
+public final class BooleanValueSerializer extends TypeSerializerSingleton<BooleanValue> {
private static final long serialVersionUID = 1L;
@@ -49,6 +48,13 @@ public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
}
@Override
+ public BooleanValue copy(BooleanValue from) {
+ BooleanValue result = new BooleanValue();
+ result.setValue(from.getValue());
+ return result;
+ }
+
+ @Override
public BooleanValue copy(BooleanValue from, BooleanValue reuse) {
reuse.setValue(from.getValue());
return reuse;
@@ -65,6 +71,11 @@ public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
}
@Override
+ public BooleanValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new BooleanValue(), source);
+ }
+
+ @Override
public BooleanValue deserialize(BooleanValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
index df52c13..32f3edd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-public class ByteSerializer extends TypeSerializer<Byte> {
+public final class ByteSerializer extends TypeSerializerSingleton<Byte> {
private static final long serialVersionUID = 1L;
@@ -50,6 +49,11 @@ public class ByteSerializer extends TypeSerializer<Byte> {
}
@Override
+ public Byte copy(Byte from) {
+ return from;
+ }
+
+ @Override
public Byte copy(Byte from, Byte reuse) {
return from;
}
@@ -65,9 +69,14 @@ public class ByteSerializer extends TypeSerializer<Byte> {
}
@Override
- public Byte deserialize(Byte reuse, DataInputView source) throws IOException {
+ public Byte deserialize(DataInputView source) throws IOException {
return Byte.valueOf(source.readByte());
}
+
+ @Override
+ public Byte deserialize(Byte reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
index ab3df38..24cc98b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.ByteValue;
-public class ByteValueSerializer extends TypeSerializer<ByteValue> {
+public final class ByteValueSerializer extends TypeSerializerSingleton<ByteValue> {
private static final long serialVersionUID = 1L;
@@ -49,6 +48,11 @@ public class ByteValueSerializer extends TypeSerializer<ByteValue> {
}
@Override
+ public ByteValue copy(ByteValue from) {
+ return copy(from, new ByteValue());
+ }
+
+ @Override
public ByteValue copy(ByteValue from, ByteValue reuse) {
reuse.setValue(from.getValue());
return reuse;
@@ -65,6 +69,11 @@ public class ByteValueSerializer extends TypeSerializer<ByteValue> {
}
@Override
+ public ByteValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new ByteValue(), source);
+ }
+
+ @Override
public ByteValue deserialize(ByteValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
index 4ef9a56..c46d3a0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-public class CharSerializer extends TypeSerializer<Character> {
+public final class CharSerializer extends TypeSerializerSingleton<Character> {
private static final long serialVersionUID = 1L;
@@ -50,6 +49,11 @@ public class CharSerializer extends TypeSerializer<Character> {
}
@Override
+ public Character copy(Character from) {
+ return from;
+ }
+
+ @Override
public Character copy(Character from, Character reuse) {
return from;
}
@@ -65,9 +69,14 @@ public class CharSerializer extends TypeSerializer<Character> {
}
@Override
- public Character deserialize(Character reuse, DataInputView source) throws IOException {
+ public Character deserialize(DataInputView source) throws IOException {
return Character.valueOf(source.readChar());
}
+
+ @Override
+ public Character deserialize(Character reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
index 4946743..71a8ef4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
@@ -20,13 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.CharValue;
-
-public class CharValueSerializer extends TypeSerializer<CharValue> {
+public class CharValueSerializer extends TypeSerializerSingleton<CharValue> {
private static final long serialVersionUID = 1L;
@@ -47,6 +45,11 @@ public class CharValueSerializer extends TypeSerializer<CharValue> {
public CharValue createInstance() {
return new CharValue();
}
+
+ @Override
+ public CharValue copy(CharValue from) {
+ return copy(from, new CharValue());
+ }
@Override
public CharValue copy(CharValue from, CharValue reuse) {
@@ -63,6 +66,11 @@ public class CharValueSerializer extends TypeSerializer<CharValue> {
public void serialize(CharValue record, DataOutputView target) throws IOException {
record.write(target);
}
+
+ @Override
+ public CharValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new CharValue(), source);
+ }
@Override
public CharValue deserialize(CharValue reuse, DataInputView source) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
index fc8f55d..8e09f7c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-
-public class DoubleSerializer extends TypeSerializer<Double> {
+public final class DoubleSerializer extends TypeSerializerSingleton<Double> {
private static final long serialVersionUID = 1L;
@@ -50,6 +48,11 @@ public class DoubleSerializer extends TypeSerializer<Double> {
}
@Override
+ public Double copy(Double from) {
+ return from;
+ }
+
+ @Override
public Double copy(Double from, Double reuse) {
return from;
}
@@ -65,9 +68,14 @@ public class DoubleSerializer extends TypeSerializer<Double> {
}
@Override
- public Double deserialize(Double reuse, DataInputView source) throws IOException {
+ public Double deserialize(DataInputView source) throws IOException {
return Double.valueOf(source.readDouble());
}
+
+ @Override
+ public Double deserialize(Double reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
index a19f83e..f4c7f37 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.DoubleValue;
-public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
+public final class DoubleValueSerializer extends TypeSerializerSingleton<DoubleValue> {
private static final long serialVersionUID = 1L;
@@ -49,6 +48,11 @@ public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
}
@Override
+ public DoubleValue copy(DoubleValue from) {
+ return copy(from, new DoubleValue());
+ }
+
+ @Override
public DoubleValue copy(DoubleValue from, DoubleValue reuse) {
reuse.setValue(from.getValue());
return reuse;
@@ -65,6 +69,11 @@ public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
}
@Override
+ public DoubleValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new DoubleValue(), source);
+ }
+
+ @Override
public DoubleValue deserialize(DoubleValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
index c0c7917..b1a46b0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-
-public class FloatSerializer extends TypeSerializer<Float> {
+public final class FloatSerializer extends TypeSerializerSingleton<Float> {
private static final long serialVersionUID = 1L;
@@ -50,6 +48,11 @@ public class FloatSerializer extends TypeSerializer<Float> {
}
@Override
+ public Float copy(Float from) {
+ return from;
+ }
+
+ @Override
public Float copy(Float from, Float reuse) {
return from;
}
@@ -65,9 +68,14 @@ public class FloatSerializer extends TypeSerializer<Float> {
}
@Override
- public Float deserialize(Float reuse, DataInputView source) throws IOException {
+ public Float deserialize(DataInputView source) throws IOException {
return Float.valueOf(source.readFloat());
}
+
+ @Override
+ public Float deserialize(Float reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
index 5660dea..6ebb268 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.FloatValue;
-public class FloatValueSerializer extends TypeSerializer<FloatValue> {
+public class FloatValueSerializer extends TypeSerializerSingleton<FloatValue> {
private static final long serialVersionUID = 1L;
@@ -49,6 +48,11 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
}
@Override
+ public FloatValue copy(FloatValue from) {
+ return copy(from, new FloatValue());
+ }
+
+ @Override
public FloatValue copy(FloatValue from, FloatValue reuse) {
reuse.setValue(from.getValue());
return reuse;
@@ -65,6 +69,11 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
}
@Override
+ public FloatValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new FloatValue(), source);
+ }
+
+ @Override
public FloatValue deserialize(FloatValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
@@ -72,6 +81,6 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
- target.writeDouble(source.readFloat());
+ target.writeFloat(source.readFloat());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index cfdb132..504b41b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -27,9 +27,11 @@ import org.apache.flink.core.memory.DataOutputView;
/**
+ * A serializer for arrays of objects.
+ *
* @param <C> The component type
*/
-public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
+public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
private static final long serialVersionUID = 1L;
@@ -40,7 +42,6 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
private final C[] EMPTY;
-
public GenericArraySerializer(Class<C> componentClass, TypeSerializer<C> componentSerializer) {
if (componentClass == null || componentSerializer == null) {
throw new NullPointerException();
@@ -68,7 +69,7 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
}
@Override
- public C[] copy(C[] from, C[] reuse) {
+ public C[] copy(C[] from) {
C[] copy = create(from.length);
for (int i = 0; i < copy.length; i++) {
@@ -77,6 +78,11 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
return copy;
}
+
+ @Override
+ public C[] copy(C[] from, C[] reuse) {
+ return copy(from);
+ }
@Override
public int getLength() {
@@ -98,6 +104,24 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
}
@Override
+ public C[] deserialize(DataInputView source) throws IOException {
+ int len = source.readInt();
+
+ C[] array = create(len);
+
+ for (int i = 0; i < len; i++) {
+ boolean isNonNull = source.readBoolean();
+ if (isNonNull) {
+ array[i] = componentSerializer.deserialize(source);
+ } else {
+ array[i] = null;
+ }
+ }
+
+ return array;
+ }
+
+ @Override
public C[] deserialize(C[] reuse, DataInputView source) throws IOException {
int len = source.readInt();
@@ -108,7 +132,13 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
for (int i = 0; i < len; i++) {
boolean isNonNull = source.readBoolean();
if (isNonNull) {
- reuse[i] = componentSerializer.deserialize(componentSerializer.createInstance(), source);
+ C ri = reuse[i];
+ if (ri == null) {
+ ri = componentSerializer.deserialize(source);
+ } else {
+ ri = componentSerializer.deserialize(ri, source);
+ }
+ reuse[i] = ri;
} else {
reuse[i] = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
index 28192cd..2937b2a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-public class IntSerializer extends TypeSerializer<Integer> {
+public final class IntSerializer extends TypeSerializerSingleton<Integer> {
private static final long serialVersionUID = 1L;
@@ -50,6 +49,11 @@ public class IntSerializer extends TypeSerializer<Integer> {
}
@Override
+ public Integer copy(Integer from) {
+ return from;
+ }
+
+ @Override
public Integer copy(Integer from, Integer reuse) {
return from;
}
@@ -65,9 +69,14 @@ public class IntSerializer extends TypeSerializer<Integer> {
}
@Override
- public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
+ public Integer deserialize(DataInputView source) throws IOException {
return Integer.valueOf(source.readInt());
}
+
+ @Override
+ public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
index 9cf72d9..ec1f345 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.IntValue;
-public class IntValueSerializer extends TypeSerializer<IntValue> {
+public final class IntValueSerializer extends TypeSerializerSingleton<IntValue> {
private static final long serialVersionUID = 1L;
@@ -49,6 +48,11 @@ public class IntValueSerializer extends TypeSerializer<IntValue> {
}
@Override
+ public IntValue copy(IntValue from) {
+ return copy(from, new IntValue());
+ }
+
+ @Override
public IntValue copy(IntValue from, IntValue reuse) {
reuse.setValue(from.getValue());
return reuse;
@@ -65,6 +69,11 @@ public class IntValueSerializer extends TypeSerializer<IntValue> {
}
@Override
+ public IntValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new IntValue(), source);
+ }
+
+ @Override
public IntValue deserialize(IntValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
index 2ca2b84..6b25596 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-public class LongSerializer extends TypeSerializer<Long> {
+public final class LongSerializer extends TypeSerializerSingleton<Long> {
private static final long serialVersionUID = 1L;
@@ -50,6 +49,11 @@ public class LongSerializer extends TypeSerializer<Long> {
}
@Override
+ public Long copy(Long from) {
+ return from;
+ }
+
+ @Override
public Long copy(Long from, Long reuse) {
return from;
}
@@ -65,9 +69,14 @@ public class LongSerializer extends TypeSerializer<Long> {
}
@Override
- public Long deserialize(Long reuse, DataInputView source) throws IOException {
+ public Long deserialize(DataInputView source) throws IOException {
return Long.valueOf(source.readLong());
}
+
+ @Override
+ public Long deserialize(Long reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
index ea45dcc..95caf04 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.LongValue;
-public class LongValueSerializer extends TypeSerializer<LongValue> {
+public final class LongValueSerializer extends TypeSerializerSingleton<LongValue> {
private static final long serialVersionUID = 1L;
@@ -49,6 +48,11 @@ public class LongValueSerializer extends TypeSerializer<LongValue> {
}
@Override
+ public LongValue copy(LongValue from) {
+ return copy(from, new LongValue());
+ }
+
+ @Override
public LongValue copy(LongValue from, LongValue reuse) {
reuse.setValue(from.getValue());
return reuse;
@@ -65,6 +69,11 @@ public class LongValueSerializer extends TypeSerializer<LongValue> {
}
@Override
+ public LongValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new LongValue(), source);
+ }
+
+ @Override
public LongValue deserialize(LongValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
index 156732c..c6e7870 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-public class ShortSerializer extends TypeSerializer<Short> {
+public final class ShortSerializer extends TypeSerializerSingleton<Short> {
private static final long serialVersionUID = 1L;
@@ -50,6 +49,11 @@ public class ShortSerializer extends TypeSerializer<Short> {
}
@Override
+ public Short copy(Short from) {
+ return from;
+ }
+
+ @Override
public Short copy(Short from, Short reuse) {
return from;
}
@@ -65,9 +69,14 @@ public class ShortSerializer extends TypeSerializer<Short> {
}
@Override
- public Short deserialize(Short reuse, DataInputView source) throws IOException {
+ public Short deserialize(DataInputView source) throws IOException {
return Short.valueOf(source.readShort());
}
+
+ @Override
+ public Short deserialize(Short reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
index ac46b46..ab58987 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.ShortValue;
-public class ShortValueSerializer extends TypeSerializer<ShortValue> {
+public final class ShortValueSerializer extends TypeSerializerSingleton<ShortValue> {
private static final long serialVersionUID = 1L;
@@ -49,6 +48,11 @@ public class ShortValueSerializer extends TypeSerializer<ShortValue> {
}
@Override
+ public ShortValue copy(ShortValue from) {
+ return copy(from, new ShortValue());
+ }
+
+ @Override
public ShortValue copy(ShortValue from, ShortValue reuse) {
reuse.setValue(from.getValue());
return reuse;
@@ -65,6 +69,11 @@ public class ShortValueSerializer extends TypeSerializer<ShortValue> {
}
@Override
+ public ShortValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new ShortValue(), source);
+ }
+
+ @Override
public ShortValue deserialize(ShortValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
index d71b521..71221a2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.StringValue;
-public class StringSerializer extends TypeSerializer<String> {
+public final class StringSerializer extends TypeSerializerSingleton<String> {
private static final long serialVersionUID = 1L;
@@ -50,6 +49,11 @@ public class StringSerializer extends TypeSerializer<String> {
}
@Override
+ public String copy(String from) {
+ return from;
+ }
+
+ @Override
public String copy(String from, String reuse) {
return from;
}
@@ -65,9 +69,14 @@ public class StringSerializer extends TypeSerializer<String> {
}
@Override
- public String deserialize(String record, DataInputView source) throws IOException {
+ public String deserialize(DataInputView source) throws IOException {
return StringValue.readString(source);
}
+
+ @Override
+ public String deserialize(String record, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
index 352330f..c5d5b55 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
@@ -20,16 +20,17 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.StringValue;
-public class StringValueSerializer extends TypeSerializer<StringValue> {
+public final class StringValueSerializer extends TypeSerializerSingleton<StringValue> {
private static final long serialVersionUID = 1L;
+ private static final int HIGH_BIT = 0x1 << 7;
+
public static final StringValueSerializer INSTANCE = new StringValueSerializer();
@@ -49,6 +50,11 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
}
@Override
+ public StringValue copy(StringValue from) {
+ return copy(from, new StringValue());
+ }
+
+ @Override
public StringValue copy(StringValue from, StringValue reuse) {
reuse.setValue(from);
return reuse;
@@ -65,6 +71,11 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
}
@Override
+ public StringValue deserialize(DataInputView source) throws IOException {
+ return deserialize(new StringValue(), source);
+ }
+
+ @Override
public StringValue deserialize(StringValue reuse, DataInputView source) throws IOException {
reuse.read(source);
return reuse;
@@ -72,6 +83,29 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
- StringValue.copyString(source, target);
+ int len = source.readUnsignedByte();
+ target.writeByte(len);
+
+ if (len >= HIGH_BIT) {
+ int shift = 7;
+ int curr;
+ len = len & 0x7f;
+ while ((curr = source.readUnsignedByte()) >= HIGH_BIT) {
+ target.writeByte(curr);
+ len |= (curr & 0x7f) << shift;
+ shift += 7;
+ }
+ target.writeByte(curr);
+ len |= curr << shift;
+ }
+
+ for (int i = 0; i < len; i++) {
+ int c = source.readUnsignedByte();
+ target.writeByte(c);
+ while (c >= HIGH_BIT) {
+ c = source.readUnsignedByte();
+ target.writeByte(c);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
new file mode 100644
index 0000000..979d5ab
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
+
+ private static final long serialVersionUID = 8766687317209282373L;
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj != null && obj.getClass() == this.getClass();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
index 0093463..e9941a8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * A serializer for long arrays.
+ * A serializer for boolean arrays.
*/
-public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
+public final class BooleanPrimitiveArraySerializer extends TypeSerializerSingleton<boolean[]>{
private static final long serialVersionUID = 1L;
@@ -52,13 +52,18 @@ public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
}
@Override
- public boolean[] copy(boolean[] from, boolean[] reuse) {
+ public boolean[] copy(boolean[] from) {
boolean[] copy = new boolean[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
@Override
+ public boolean[] copy(boolean[] from, boolean[] reuse) {
+ return copy(from);
+ }
+
+ @Override
public int getLength() {
return -1;
}
@@ -79,15 +84,20 @@ public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
@Override
- public boolean[] deserialize(boolean[] reuse, DataInputView source) throws IOException {
+ public boolean[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new boolean[len];
+ boolean[] result = new boolean[len];
for (int i = 0; i < len; i++) {
- reuse[i] = source.readBoolean();
+ result[i] = source.readBoolean();
}
- return reuse;
+ return result;
+ }
+
+ @Override
+ public boolean[] deserialize(boolean[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
index fa0638a..aaf867f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * A serializer for long arrays.
+ * A serializer for byte arrays.
*/
-public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
+public final class BytePrimitiveArraySerializer extends TypeSerializerSingleton<byte[]>{
private static final long serialVersionUID = 1L;
@@ -51,11 +51,16 @@ public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
}
@Override
- public byte[] copy(byte[] from, byte[] reuse) {
+ public byte[] copy(byte[] from) {
byte[] copy = new byte[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
+
+ @Override
+ public byte[] copy(byte[] from, byte[] reuse) {
+ return copy(from);
+ }
@Override
public int getLength() {
@@ -74,13 +79,17 @@ public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
target.write(record);
}
-
@Override
- public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
+ public byte[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new byte[len];
- source.readFully(reuse);
- return reuse;
+ byte[] result = new byte[len];
+ source.readFully(result);
+ return result;
+ }
+
+ @Override
+ public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
index 639d4b6..64632bd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * A serializer for long arrays.
+ * A serializer for char arrays.
*/
-public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
+public final class CharPrimitiveArraySerializer extends TypeSerializerSingleton<char[]>{
private static final long serialVersionUID = 1L;
@@ -52,11 +52,16 @@ public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
}
@Override
- public char[] copy(char[] from, char[] reuse) {
+ public char[] copy(char[] from) {
char[] copy = new char[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
+
+ @Override
+ public char[] copy(char[] from, char[] reuse) {
+ return copy(from);
+ }
@Override
public int getLength() {
@@ -77,17 +82,21 @@ public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
}
}
-
@Override
- public char[] deserialize(char[] reuse, DataInputView source) throws IOException {
+ public char[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new char[len];
+ char[] result = new char[len];
for (int i = 0; i < len; i++) {
- reuse[i] = source.readChar();
+ result[i] = source.readChar();
}
- return reuse;
+ return result;
+ }
+
+ @Override
+ public char[] deserialize(char[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
index 2b089a3..846ae74 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * A serializer for long arrays.
+ * A serializer for double arrays.
*/
-public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
+public final class DoublePrimitiveArraySerializer extends TypeSerializerSingleton<double[]>{
private static final long serialVersionUID = 1L;
@@ -50,15 +50,20 @@ public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
public double[] createInstance() {
return EMPTY;
}
-
+
@Override
- public double[] copy(double[] from, double[] reuse) {
+ public double[] copy(double[] from) {
double[] copy = new double[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
@Override
+ public double[] copy(double[] from, double[] reuse) {
+ return copy(from);
+ }
+
+ @Override
public int getLength() {
return -1;
}
@@ -77,17 +82,21 @@ public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
}
}
-
@Override
- public double[] deserialize(double[] reuse, DataInputView source) throws IOException {
+ public double[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new double[len];
+ double[] result = new double[len];
for (int i = 0; i < len; i++) {
- reuse[i] = source.readDouble();
+ result[i] = source.readDouble();
}
- return reuse;
+ return result;
+ }
+
+ @Override
+ public double[] deserialize(double[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
index 897292e..8f42ac8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * A serializer for long arrays.
+ * A serializer for float arrays.
*/
-public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
+public final class FloatPrimitiveArraySerializer extends TypeSerializerSingleton<float[]>{
private static final long serialVersionUID = 1L;
@@ -52,11 +52,16 @@ public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
}
@Override
- public float[] copy(float[] from, float[] reuse) {
+ public float[] copy(float[] from) {
float[] copy = new float[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
+
+ @Override
+ public float[] copy(float[] from, float[] reuse) {
+ return copy(from);
+ }
@Override
public int getLength() {
@@ -77,17 +82,21 @@ public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
}
}
-
@Override
- public float[] deserialize(float[] reuse, DataInputView source) throws IOException {
+ public float[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new float[len];
+ float[] result = new float[len];
for (int i = 0; i < len; i++) {
- reuse[i] = source.readFloat();
+ result[i] = source.readFloat();
}
- return reuse;
+ return result;
+ }
+
+ @Override
+ public float[] deserialize(float[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
index aeaf35e..2ab056c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * A serializer for long arrays.
+ * A serializer for int arrays.
*/
-public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
+public class IntPrimitiveArraySerializer extends TypeSerializerSingleton<int[]>{
private static final long serialVersionUID = 1L;
@@ -52,11 +52,16 @@ public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
}
@Override
- public int[] copy(int[] from, int[] reuse) {
+ public int[] copy(int[] from) {
int[] copy = new int[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
+
+ @Override
+ public int[] copy(int[] from, int[] reuse) {
+ return copy(from);
+ }
@Override
public int getLength() {
@@ -77,17 +82,21 @@ public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
}
}
-
@Override
- public int[] deserialize(int[] reuse, DataInputView source) throws IOException {
+ public int[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new int[len];
+ int[] result = new int[len];
for (int i = 0; i < len; i++) {
- reuse[i] = source.readInt();
+ result[i] = source.readInt();
}
- return reuse;
+ return result;
+ }
+
+ @Override
+ public int[] deserialize(int[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
index 953e8ff..5d34dfe 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
* A serializer for long arrays.
*/
-public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
+public final class LongPrimitiveArraySerializer extends TypeSerializerSingleton<long[]>{
private static final long serialVersionUID = 1L;
@@ -52,10 +52,15 @@ public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
}
@Override
+ public long[] copy(long[] from) {
+ long[] result = new long[from.length];
+ System.arraycopy(from, 0, result, 0, from.length);
+ return result;
+ }
+
+ @Override
public long[] copy(long[] from, long[] reuse) {
- reuse = new long[from.length];
- System.arraycopy(from, 0, reuse, 0, from.length);
- return reuse;
+ return copy(from);
}
@Override
@@ -77,17 +82,21 @@ public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
}
}
-
@Override
- public long[] deserialize(long[] reuse, DataInputView source) throws IOException {
+ public long[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new long[len];
+ long[] array = new long[len];
for (int i = 0; i < len; i++) {
- reuse[i] = source.readLong();
+ array[i] = source.readLong();
}
- return reuse;
+ return array;
+ }
+
+ @Override
+ public long[] deserialize(long[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
index 014fd05..2f37033 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
- * A serializer for long arrays.
+ * A serializer for short arrays.
*/
-public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
+public final class ShortPrimitiveArraySerializer extends TypeSerializerSingleton<short[]>{
private static final long serialVersionUID = 1L;
@@ -52,11 +52,16 @@ public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
}
@Override
- public short[] copy(short[] from, short[] reuse) {
+ public short[] copy(short[] from) {
short[] copy = new short[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
+
+ @Override
+ public short[] copy(short[] from, short[] reuse) {
+ return copy(from);
+ }
@Override
public int getLength() {
@@ -77,17 +82,21 @@ public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
}
}
-
@Override
- public short[] deserialize(short[] reuse, DataInputView source) throws IOException {
+ public short[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new short[len];
+ short[] array = new short[len];
for (int i = 0; i < len; i++) {
- reuse[i] = source.readShort();
+ array[i] = source.readShort();
}
- return reuse;
+ return array;
+ }
+
+ @Override
+ public short[] deserialize(short[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
index c6c1826..d5ab030 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.StringValue;
@@ -29,7 +29,7 @@ import org.apache.flink.types.StringValue;
/**
* A serializer for String arrays. Specialized for efficiency.
*/
-public class StringArraySerializer extends TypeSerializer<String[]>{
+public final class StringArraySerializer extends TypeSerializerSingleton<String[]>{
private static final long serialVersionUID = 1L;
@@ -54,10 +54,15 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
}
@Override
+ public String[] copy(String[] from) {
+ String[] target = new String[from.length];
+ System.arraycopy(from, 0, target, 0, from.length);
+ return target;
+ }
+
+ @Override
public String[] copy(String[] from, String[] reuse) {
- reuse = new String[from.length];
- System.arraycopy(from, 0, reuse, 0, from.length);
- return reuse;
+ return copy(from);
}
@Override
@@ -65,7 +70,6 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
return -1;
}
-
@Override
public void serialize(String[] record, DataOutputView target) throws IOException {
if (record == null) {
@@ -79,17 +83,21 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
}
}
-
@Override
- public String[] deserialize(String[] reuse, DataInputView source) throws IOException {
+ public String[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
- reuse = new String[len];
+ String[] array = new String[len];
for (int i = 0; i < len; i++) {
- reuse[i] = StringValue.readString(source);
+ array[i] = StringValue.readString(source);
}
- return reuse;
+ return array;
+ }
+
+ @Override
+ public String[] deserialize(String[] reuse, DataInputView source) throws IOException {
+ return deserialize(source);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
index 7e8762e..7b72e89 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
@@ -67,12 +67,16 @@ public final class RecordSerializer extends TypeSerializer<Record> {
}
@Override
+ public Record copy(Record from) {
+ return from.createCopy();
+ }
+
+ @Override
public Record copy(Record from, Record reuse) {
from.copyTo(reuse);
return reuse;
}
-
@Override
public int getLength() {
return -1;
@@ -86,6 +90,11 @@ public final class RecordSerializer extends TypeSerializer<Record> {
}
@Override
+ public Record deserialize(DataInputView source) throws IOException {
+ return deserialize(new Record(), source);
+ }
+
+ @Override
public Record deserialize(Record target, DataInputView source) throws IOException {
target.deserialize(source);
return target;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 6ca3f6c..d509284 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -89,6 +89,24 @@ public abstract class SerializerTestBase<T> {
}
@Test
+ public void testCopy() {
+ try {
+ TypeSerializer<T> serializer = getSerializer();
+ T[] testData = getData();
+
+ for (T datum : testData) {
+ T copy = serializer.copy(datum);
+ deepEquals("Copied element is not equal to the original element.", datum, copy);
+ }
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Exception in test: " + e.getMessage());
+ }
+ }
+
+ @Test
public void testCopyIntoNewElements() {
try {
TypeSerializer<T> serializer = getSerializer();
@@ -184,7 +202,36 @@ public abstract class SerializerTestBase<T> {
}
@Test
- public void testSerializeAsSequence() {
+ public void testSerializeAsSequenceNoReuse() {
+ try {
+ TypeSerializer<T> serializer = getSerializer();
+ T[] testData = getData();
+
+ TestOutputView out = new TestOutputView();
+ for (T value : testData) {
+ serializer.serialize(value, out);
+ }
+
+ TestInputView in = out.getInputView();
+
+ int num = 0;
+ while (in.available() > 0) {
+ T deserialized = serializer.deserialize(in);
+ deepEquals("Deserialized value if wrong.", testData[num], deserialized);
+ num++;
+ }
+
+ assertEquals("Wrong number of elements deserialized.", testData.length, num);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Exception in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSerializeAsSequenceReusingValues() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
index 2eb52c0..5b63633 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
@@ -69,11 +69,13 @@ public class
public void testAll() {
testInstantiate();
testGetLength();
+ testCopy();
testCopyIntoNewElements();
testCopyIntoReusedElements();
testSerializeIndividually();
testSerializeIndividuallyReusingValues();
- testSerializeAsSequence();
+ testSerializeAsSequenceNoReuse();
+ testSerializeAsSequenceReusingValues();
testSerializedCopyIndividually();
testSerializedCopyAsSequence();
testSerializabilityAndEquals();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java
new file mode 100644
index 0000000..43f1a57
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.BooleanValue;
+
+/**
+ * A test for the {@link BooleanValueSerializer}.
+ */
+public class BooleanValueSerializerTest extends SerializerTestBase<BooleanValue> {
+
+ @Override
+ protected TypeSerializer<BooleanValue> createSerializer() {
+ return new BooleanValueSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 1;
+ }
+
+ @Override
+ protected Class<BooleanValue> getTypeClass() {
+ return BooleanValue.class;
+ }
+
+ @Override
+ protected BooleanValue[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+
+ return new BooleanValue[] {new BooleanValue(true), new BooleanValue(false),
+ new BooleanValue(rnd.nextBoolean()),
+ new BooleanValue(rnd.nextBoolean()),
+ new BooleanValue(rnd.nextBoolean())};
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java
new file mode 100644
index 0000000..0e16629
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ByteValue;
+
+/**
+ * A test for the {@link ByteValueSerializer}.
+ */
+public class ByteValueSerializerTest extends SerializerTestBase<ByteValue> {
+
+ @Override
+ protected TypeSerializer<ByteValue> createSerializer() {
+ return new ByteValueSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 1;
+ }
+
+ @Override
+ protected Class<ByteValue> getTypeClass() {
+ return ByteValue.class;
+ }
+
+ @Override
+ protected ByteValue[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+ byte byteArray[] = new byte[1];
+ rnd.nextBytes(byteArray);
+
+ return new ByteValue[] {new ByteValue((byte) 0), new ByteValue((byte) 1), new ByteValue((byte) -1),
+ new ByteValue(Byte.MAX_VALUE), new ByteValue(Byte.MIN_VALUE),
+ new ByteValue(byteArray[0]), new ByteValue((byte) -byteArray[0])};
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java
new file mode 100644
index 0000000..ac83666
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.CharValue;
+
+/**
+ * A test for the {@link CharValueSerializer}.
+ */
+public class CharValueSerializerTest extends SerializerTestBase<CharValue> {
+
+ @Override
+ protected TypeSerializer<CharValue> createSerializer() {
+ return new CharValueSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 2;
+ }
+
+ @Override
+ protected Class<CharValue> getTypeClass() {
+ return CharValue.class;
+ }
+
+ @Override
+ protected CharValue[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+ int rndInt = rnd.nextInt((int) Character.MAX_VALUE);
+
+ return new CharValue[] {new CharValue('a'), new CharValue('@'), new CharValue('ä'),
+ new CharValue('1'), new CharValue((char) rndInt),
+ new CharValue(Character.MAX_VALUE), new CharValue(Character.MIN_VALUE)};
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java
new file mode 100644
index 0000000..c0a2b24
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.DoubleValue;
+
+/**
+ * A test for the {@link DoubleValueSerializer}.
+ */
+public class DoubleValueSerializerTest extends SerializerTestBase<DoubleValue> {
+
+ @Override
+ protected TypeSerializer<DoubleValue> createSerializer() {
+ return new DoubleValueSerializer();
+ }
+
+ @Override
+ protected int getLength() {
+ return 8;
+ }
+
+ @Override
+ protected Class<DoubleValue> getTypeClass() {
+ return DoubleValue.class;
+ }
+
+ @Override
+ protected DoubleValue[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+ Double rndDouble = rnd.nextDouble() * Double.MAX_VALUE;
+
+ return new DoubleValue[] {new DoubleValue(0), new DoubleValue(1), new DoubleValue(-1),
+ new DoubleValue(Double.MAX_VALUE), new DoubleValue(Double.MIN_VALUE),
+ new DoubleValue(rndDouble), new DoubleValue(-rndDouble),
+ new DoubleValue(Double.NaN),
+ new DoubleValue(Double.NEGATIVE_INFINITY), new DoubleValue(Double.POSITIVE_INFINITY)};
+ }
+}