You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/13 13:02:32 UTC

flink git commit: [FLINK-7484] Perform proper deep copy in CaseClassSerializer.duplicate()

Repository: flink
Updated Branches:
  refs/heads/master 85b2f2706 -> 90be5774e


[FLINK-7484] Perform proper deep copy in CaseClassSerializer.duplicate()

This also adds a test that verifies the deep copy.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90be5774
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90be5774
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90be5774

Branch: refs/heads/master
Commit: 90be5774e481af87355b9f475562180923039a93
Parents: 85b2f27
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 13 13:11:42 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 13 15:01:38 2017 +0200

----------------------------------------------------------------------
 .../typeutils/runtime/TupleSerializerBase.java  |  8 ++++-
 .../scala/typeutils/CaseClassSerializer.scala   |  2 +-
 .../api/scala/runtime/TupleSerializerTest.scala | 33 ++++++++++++++++----
 3 files changed, 35 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/90be5774/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index f12dcd9..3fb7def 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
@@ -43,7 +44,7 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 
 	protected final Class<T> tupleClass;
 
-	protected final TypeSerializer<Object>[] fieldSerializers;
+	protected TypeSerializer<Object>[] fieldSerializers;
 
 	protected final int arity;
 
@@ -183,4 +184,9 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 	}
 
 	protected abstract TupleSerializerBase<T> createSerializerInstance(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers);
+
+	@VisibleForTesting
+	public TypeSerializer<Object>[] getFieldSerializers() {
+		return fieldSerializers;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/90be5774/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index c8222d6..c059913 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -48,7 +48,7 @@ abstract class CaseClassSerializer[T <: Product](
     val result = super.clone().asInstanceOf[CaseClassSerializer[T]]
 
     // achieve a deep copy by duplicating the field serializers
-    result.fieldSerializers.transform(_.duplicate())
+    result.fieldSerializers = result.fieldSerializers.map(_.duplicate())
     result.fields = null
     result.instanceCreationFailed = false
 

http://git-wip-us.apache.org/repos/asf/flink/blob/90be5774/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
index b210c99..01acc7c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
@@ -20,23 +20,44 @@ package org.apache.flink.api.scala.runtime
 import java.util
 import java.util.Random
 
-import org.apache.flink.api.scala._
 import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest._
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.util.StringUtils
-
 import org.joda.time.LocalDate
-
-import org.junit.Assert
-import org.junit.Test
+import org.junit.Assert._
+import org.junit.{Assert, Test}
 
 import scala.collection.JavaConverters._
 
 class TupleSerializerTest {
 
   @Test
+  def testProperDeepCopy(): Unit = {
+    val tpe = createTypeInformation[((String, Int), (Int, String))]
+
+    val originalSerializer =
+      tpe.createSerializer(new ExecutionConfig)
+        .asInstanceOf[CaseClassSerializer[((String, Int), (Int, String))]]
+    val duplicateSerializer = originalSerializer.duplicate()
+
+    duplicateSerializer.getFieldSerializers
+
+    // the list of child serializers must be duplicated
+    assertTrue(duplicateSerializer.getFieldSerializers ne originalSerializer.getFieldSerializers)
+
+    // each of the child serializers (which are themselves CaseClassSerializers) must be duplicated
+    assertTrue(
+      duplicateSerializer.getFieldSerializers()(0) ne originalSerializer.getFieldSerializers()(0))
+
+    assertTrue(
+      duplicateSerializer.getFieldSerializers()(1) ne originalSerializer.getFieldSerializers()(1))
+  }
+
+  @Test
   def testTuple1Int(): Unit = {
     val testTuples = Array(Tuple1(42), Tuple1(1), Tuple1(0), Tuple1(-1), Tuple1(Int.MaxValue),
       Tuple1(Int.MinValue))