You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/13 13:02:32 UTC
flink git commit: [FLINK-7484] Perform proper deep copy in
CaseClassSerializer.duplicate()
Repository: flink
Updated Branches:
refs/heads/master 85b2f2706 -> 90be5774e
[FLINK-7484] Perform proper deep copy in CaseClassSerializer.duplicate()
This also adds a test that verifies the deep copy.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90be5774
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90be5774
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90be5774
Branch: refs/heads/master
Commit: 90be5774e481af87355b9f475562180923039a93
Parents: 85b2f27
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 13 13:11:42 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 13 15:01:38 2017 +0200
----------------------------------------------------------------------
.../typeutils/runtime/TupleSerializerBase.java | 8 ++++-
.../scala/typeutils/CaseClassSerializer.scala | 2 +-
.../api/scala/runtime/TupleSerializerTest.scala | 33 ++++++++++++++++----
3 files changed, 35 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/90be5774/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index f12dcd9..3fb7def 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
@@ -43,7 +44,7 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
protected final Class<T> tupleClass;
- protected final TypeSerializer<Object>[] fieldSerializers;
+ protected TypeSerializer<Object>[] fieldSerializers;
protected final int arity;
@@ -183,4 +184,9 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
}
protected abstract TupleSerializerBase<T> createSerializerInstance(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers);
+
+ @VisibleForTesting
+ public TypeSerializer<Object>[] getFieldSerializers() {
+ return fieldSerializers;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/90be5774/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index c8222d6..c059913 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -48,7 +48,7 @@ abstract class CaseClassSerializer[T <: Product](
val result = super.clone().asInstanceOf[CaseClassSerializer[T]]
// achieve a deep copy by duplicating the field serializers
- result.fieldSerializers.transform(_.duplicate())
+ result.fieldSerializers = result.fieldSerializers.map(_.duplicate())
result.fields = null
result.instanceCreationFailed = false
http://git-wip-us.apache.org/repos/asf/flink/blob/90be5774/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
index b210c99..01acc7c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
@@ -20,23 +20,44 @@ package org.apache.flink.api.scala.runtime
import java.util
import java.util.Random
-import org.apache.flink.api.scala._
import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest._
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.CaseClassSerializer
import org.apache.flink.util.StringUtils
-
import org.joda.time.LocalDate
-
-import org.junit.Assert
-import org.junit.Test
+import org.junit.Assert._
+import org.junit.{Assert, Test}
import scala.collection.JavaConverters._
class TupleSerializerTest {
@Test
+ def testProperDeepCopy(): Unit = {
+ val tpe = createTypeInformation[((String, Int), (Int, String))]
+
+ val originalSerializer =
+ tpe.createSerializer(new ExecutionConfig)
+ .asInstanceOf[CaseClassSerializer[((String, Int), (Int, String))]]
+ val duplicateSerializer = originalSerializer.duplicate()
+
+ duplicateSerializer.getFieldSerializers
+
+ // the list of child serializers must be duplicated
+ assertTrue(duplicateSerializer.getFieldSerializers ne originalSerializer.getFieldSerializers)
+
+ // each of the child serializers (which are themselves CaseClassSerializers) must be duplicated
+ assertTrue(
+ duplicateSerializer.getFieldSerializers()(0) ne originalSerializer.getFieldSerializers()(0))
+
+ assertTrue(
+ duplicateSerializer.getFieldSerializers()(1) ne originalSerializer.getFieldSerializers()(1))
+ }
+
+ @Test
def testTuple1Int(): Unit = {
val testTuples = Array(Tuple1(42), Tuple1(1), Tuple1(0), Tuple1(-1), Tuple1(Int.MaxValue),
Tuple1(Int.MinValue))