You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/12/16 15:36:44 UTC

flink git commit: [FLINK-5011] [types] TraversableSerializer does not perform a deep copy of the elements it is traversing

Repository: flink
Updated Branches:
  refs/heads/master 7957b2df5 -> 48ef46a4d


[FLINK-5011] [types] TraversableSerializer does not perform a deep copy of the elements it is traversing

This closes #2952.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48ef46a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48ef46a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48ef46a4

Branch: refs/heads/master
Commit: 48ef46a4d33fca4033a293e8b0ff3a386194f69f
Parents: 7957b2d
Author: twalthr <tw...@apache.org>
Authored: Tue Dec 6 15:07:45 2016 +0100
Committer: twalthr <tw...@apache.org>
Committed: Fri Dec 16 16:31:56 2016 +0100

----------------------------------------------------------------------
 .../scala/typeutils/TraversableSerializer.scala |  4 +-
 .../ScalaSpecialTypesSerializerTest.scala       | 31 ++++++-
 .../runtime/TraversableSerializerTest.scala     | 92 +++++++++-----------
 3 files changed, 73 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48ef46a4/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 7d14dc1..d1b9085 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -58,14 +58,14 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
     cbf().result()
   }
 
-  override def isImmutableType: Boolean = true
+  override def isImmutableType: Boolean = false
 
   override def getLength: Int = -1
 
   override def copy(from: T): T = {
     val builder = cbf()
     builder.sizeHint(from.size)
-    from foreach { e => builder += e }
+    from foreach { e => builder += elementSerializer.copy(e) }
     builder.result()
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48ef46a4/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
index 155160c..555359f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/ScalaSpecialTypesSerializerTest.scala
@@ -22,10 +22,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.{SerializerTestInstance, TypeSerializer}
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.typeutils.EnumValueTypeInfo
 import org.junit.Assert._
 import org.junit.{Assert, Test}
 
+import scala.collection.{SortedMap, SortedSet}
 import scala.util.{Failure, Success}
 
 class ScalaSpecialTypesSerializerTest {
@@ -92,6 +92,35 @@ class ScalaSpecialTypesSerializerTest {
     runTests(testData)
   }
 
+  @Test
+  def testStringArray(): Unit = {
+    val testData = Array(Array("Foo", "Bar"), Array("Hello"))
+    runTests(testData)
+  }
+
+  @Test
+  def testIntArray(): Unit = {
+    val testData = Array(Array(1,3,3,7), Array(4,7))
+    runTests(testData)
+  }
+
+  @Test
+  def testArrayWithCaseClass(): Unit = {
+    val testData = Array(Array((1, "String"), (2, "Foo")), Array((4, "String"), (3, "Foo")))
+    runTests(testData)
+  }
+
+  @Test
+  def testSortedMap(): Unit = {
+    val testData = Array(SortedMap("Hello" -> 1, "World" -> 2), SortedMap("Foo" -> 42))
+    runTests(testData)
+  }
+
+  @Test
+  def testSortedSet(): Unit = {
+    val testData = Array(SortedSet(1,2,3), SortedSet(2,3))
+    runTests(testData)
+  }
 
   private final def runTests[T : TypeInformation](instances: Array[T]) {
     try {

http://git-wip-us.apache.org/repos/asf/flink/blob/48ef46a4/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
index 65648b6..e177e7c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TraversableSerializerTest.scala
@@ -18,20 +18,21 @@
 package org.apache.flink.api.scala.runtime
 
 import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.functions.InvalidTypesException
-import org.junit.Assert._
-
-import org.apache.flink.api.common.typeutils.{TypeSerializer, SerializerTestInstance}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.junit.{Ignore, Assert, Test}
-
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.TraversableSerializer
+import org.junit.Assert._
+import org.junit.{Assert, Ignore, Test}
 
-import scala.collection.immutable.{BitSet, SortedSet, LinearSeq}
+import scala.collection.immutable.{BitSet, LinearSeq, SortedSet}
 import scala.collection.{SortedMap, mutable}
 
 class TraversableSerializerTest {
 
+  // Note: SortedMap and SortedSet are serialized with Kryo
+
   @Test
   def testSeq(): Unit = {
     val testData = Array(Seq(1,2,3), Seq(2,3))
@@ -57,26 +58,12 @@ class TraversableSerializerTest {
   }
 
   @Test
-  def testSortedMap(): Unit = {
-    // SortedSet is not supported right now.
-    val testData = Array(SortedMap("Hello" -> 1, "World" -> 2), SortedMap("Foo" -> 42))
-    runTests(testData)
-  }
-
-  @Test
   def testSet(): Unit = {
     val testData = Array(Set(1,2,3,3), Set(2,3))
     runTests(testData)
   }
 
   @Test
-  def testSortedSet(): Unit = {
-    // SortedSet is not supported right now.
-    val testData = Array(SortedSet(1,2,3), SortedSet(2,3))
-    runTests(testData)
-  }
-
-  @Test
   def testBitSet(): Unit = {
     val testData = Array(BitSet(1,2,3,4), BitSet(2,3,2))
     runTests(testData)
@@ -89,24 +76,6 @@ class TraversableSerializerTest {
   }
 
   @Test
-  def testStringArray(): Unit = {
-    val testData = Array(Array("Foo", "Bar"), Array("Hello"))
-    runTests(testData)
-  }
-
-  @Test
-  def testIntArray(): Unit = {
-    val testData = Array(Array(1,3,3,7), Array(4,7))
-    runTests(testData)
-  }
-
-  @Test
-  def testArrayWithCaseClass(): Unit = {
-    val testData = Array(Array((1, "String"), (2, "Foo")), Array((4, "String"), (3, "Foo")))
-    runTests(testData)
-  }
-
-  @Test
   def testWithCaseClass(): Unit = {
     val testData = Array(Seq((1, "String"), (2, "Foo")), Seq((4, "String"), (3, "Foo")))
     runTests(testData)
@@ -124,31 +93,27 @@ class TraversableSerializerTest {
     // have a typeClass of Object, and therefore not deserialize the elements correctly.
     // It does work when used in a Job, though. Because the Objects get cast to
     // the correct type in the user function.
-    val testData = Array(Seq(1,1L,1d,true,"Hello"), Seq(2,2L,2d,false,"Ciao"))
+    val testData = Array(Seq(1, 1L, 1d, true, "Hello"), Seq(2, 2L, 2d, false, "Ciao"))
     runTests(testData)
   }
 
-
-
   private final def runTests[T : TypeInformation](instances: Array[T]) {
     try {
       val typeInfo = implicitly[TypeInformation[T]]
       val serializer = typeInfo.createSerializer(new ExecutionConfig)
       val typeClass = typeInfo.getTypeClass
-      val test =
-        new ScalaSpecialTypesSerializerTestInstance[T](serializer, typeClass, -1, instances)
+      val test = new TraversableSerializerTestInstance[T](serializer, typeClass, -1, instances)
       test.testAll()
     } catch {
-      case e: Exception => {
+      case e: Exception =>
         System.err.println(e.getMessage)
         e.printStackTrace()
         Assert.fail(e.getMessage)
-      }
     }
   }
 }
 
-class Pojo(val name: String, val count: Int) {
+class Pojo(var name: String, var count: Int) {
   def this() = this("", -1)
 
   override def equals(other: Any): Boolean = {
@@ -159,12 +124,38 @@ class Pojo(val name: String, val count: Int) {
   }
 }
 
-class ScalaCollectionSerializerTestInstance[T](
+class TraversableSerializerTestInstance[T](
     serializer: TypeSerializer[T],
     typeClass: Class[T],
     length: Int,
     testData: Array[T])
-  extends SerializerTestInstance[T](serializer, typeClass, length, testData: _*) {
+  extends ScalaSpecialTypesSerializerTestInstance[T](serializer, typeClass, length, testData) {
+
+  @Test
+  override def testAll(): Unit = {
+    super.testAll()
+    testTraversableDeepCopy()
+  }
+
+  @Test
+  def testTraversableDeepCopy(): Unit = {
+    val serializer = getSerializer
+    val elementSerializer = serializer.asInstanceOf[TraversableSerializer[_, _]].elementSerializer
+    val data = getTestData
+
+    // check for deep copy if type is immutable and not serialized with Kryo
+    // elements of traversable should not have reference equality
+    if (!elementSerializer.isImmutableType && !elementSerializer.isInstanceOf[KryoSerializer[_]]) {
+      data.foreach { datum =>
+        val original = datum.asInstanceOf[Traversable[_]].toIterable
+        val copy = serializer.copy(datum).asInstanceOf[Traversable[_]].toIterable
+        copy.zip(original).foreach { case (c: AnyRef, o: AnyRef) =>
+          assertTrue("Copy of mutable element has reference equality.", c ne o)
+        case _ => // ok
+        }
+      }
+    }
+  }
 
   @Test
   override def testInstantiate(): Unit = {
@@ -179,11 +170,10 @@ class ScalaCollectionSerializerTestInstance[T](
       // assertEquals("Type of the instantiated object is wrong.", tpe, instance.getClass)
     }
     catch {
-      case e: Exception => {
+      case e: Exception =>
         System.err.println(e.getMessage)
         e.printStackTrace()
         fail("Exception in test: " + e.getMessage)
-      }
     }
   }