You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/18 14:27:45 UTC

[2/2] flink git commit: [FLINK-5484] [serialization] Add test for registered Kryo types

[FLINK-5484] [serialization] Add test for registered Kryo types


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8fddae8d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8fddae8d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8fddae8d

Branch: refs/heads/master
Commit: 8fddae8da689102a35dd7b234c124591f9c79ab6
Parents: 8f4139a
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Jan 17 19:10:33 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Jan 18 15:27:16 2017 +0100

----------------------------------------------------------------------
 .../test/resources/flink_11-kryo_registrations  |  86 ++++++++++++++++
 .../runtime/KryoGenericTypeSerializerTest.scala | 100 ++++++++++++++++++-
 pom.xml                                         |   1 +
 3 files changed, 183 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8fddae8d/flink-tests/src/test/resources/flink_11-kryo_registrations
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/flink_11-kryo_registrations b/flink-tests/src/test/resources/flink_11-kryo_registrations
new file mode 100644
index 0000000..7000e62
--- /dev/null
+++ b/flink-tests/src/test/resources/flink_11-kryo_registrations
@@ -0,0 +1,86 @@
+0,int
+1,java.lang.String
+2,float
+3,boolean
+4,byte
+5,char
+6,short
+7,long
+8,double
+9,void
+10,scala.collection.convert.Wrappers$SeqWrapper
+11,scala.collection.convert.Wrappers$IteratorWrapper
+12,scala.collection.convert.Wrappers$MapWrapper
+13,scala.collection.convert.Wrappers$JListWrapper
+14,scala.collection.convert.Wrappers$JMapWrapper
+15,scala.Some
+16,scala.util.Left
+17,scala.util.Right
+18,scala.collection.immutable.Vector
+19,scala.collection.immutable.Set$Set1
+20,scala.collection.immutable.Set$Set2
+21,scala.collection.immutable.Set$Set3
+22,scala.collection.immutable.Set$Set4
+23,scala.collection.immutable.HashSet$HashTrieSet
+24,scala.collection.immutable.Map$Map1
+25,scala.collection.immutable.Map$Map2
+26,scala.collection.immutable.Map$Map3
+27,scala.collection.immutable.Map$Map4
+28,scala.collection.immutable.HashMap$HashTrieMap
+29,scala.collection.immutable.Range$Inclusive
+30,scala.collection.immutable.NumericRange$Inclusive
+31,scala.collection.immutable.NumericRange$Exclusive
+32,scala.collection.mutable.BitSet
+33,scala.collection.mutable.HashMap
+34,scala.collection.mutable.HashSet
+35,scala.collection.convert.Wrappers$IterableWrapper
+36,scala.Tuple1
+37,scala.Tuple2
+38,scala.Tuple3
+39,scala.Tuple4
+40,scala.Tuple5
+41,scala.Tuple6
+42,scala.Tuple7
+43,scala.Tuple8
+44,scala.Tuple9
+45,scala.Tuple10
+46,scala.Tuple11
+47,scala.Tuple12
+48,scala.Tuple13
+49,scala.Tuple14
+50,scala.Tuple15
+51,scala.Tuple16
+52,scala.Tuple17
+53,scala.Tuple18
+54,scala.Tuple19
+55,scala.Tuple20
+56,scala.Tuple21
+57,scala.Tuple22
+58,scala.Tuple1$mcJ$sp
+59,scala.Tuple1$mcI$sp
+60,scala.Tuple1$mcD$sp
+61,scala.Tuple2$mcJJ$sp
+62,scala.Tuple2$mcJI$sp
+63,scala.Tuple2$mcJD$sp
+64,scala.Tuple2$mcIJ$sp
+65,scala.Tuple2$mcII$sp
+66,scala.Tuple2$mcID$sp
+67,scala.Tuple2$mcDJ$sp
+68,scala.Tuple2$mcDI$sp
+69,scala.Tuple2$mcDD$sp
+70,scala.Symbol
+71,scala.reflect.ClassTag
+72,scala.runtime.BoxedUnit
+73,java.util.Arrays$ArrayList
+74,java.util.BitSet
+75,java.util.PriorityQueue
+76,java.util.regex.Pattern
+77,java.sql.Date
+78,java.sql.Time
+79,java.sql.Timestamp
+80,java.net.URI
+81,java.net.InetSocketAddress
+82,java.util.UUID
+83,java.util.Locale
+84,java.text.SimpleDateFormat
+85,org.apache.avro.generic.GenericData$Array

http://git-wip-us.apache.org/repos/asf/flink/blob/8fddae8d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
index 08a0a96..e001799 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -17,17 +17,19 @@
  */
 package org.apache.flink.api.scala.runtime
 
-import com.esotericsoftware.kryo.{Kryo, Serializer}
-import com.esotericsoftware.kryo.io.{Input, Output}
+import java.io._
 
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.esotericsoftware.kryo.{Kryo, Serializer}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeutils.SerializerTestInstance
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
-
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.joda.time.LocalDate
-
 import org.junit.Test
 
+import scala.collection.mutable
+import scala.io.Source
 import scala.reflect._
 
 class KryoGenericTypeSerializerTest {
@@ -146,6 +148,96 @@ class KryoGenericTypeSerializerTest {
     runTests(list)
   }
 
+  /**
+    * Tests that the registered classes in Kryo did not change.
+    *
+    * Once we have proper serializer versioning this test will become obsolete.
+    * But currently a change in the serializers can break savepoint backwards
+    * compatability between Flink versions.
+    */
+  @Test
+  def testDefaultKryoRegisteredClassesDidNotChange(): Unit = {
+    // Previous registration (id => registered class (Class#getName))
+    val previousRegistrations: mutable.HashMap[Int, String] = mutable.HashMap[Int, String]()
+
+    val stream = Thread.currentThread().getContextClassLoader()
+      .getResourceAsStream("flink_11-kryo_registrations")
+    Source.fromInputStream(stream).getLines().foreach{
+      line =>
+        val Array(id, registeredClass) = line.split(",")
+        previousRegistrations.put(id.toInt, registeredClass)
+    }
+
+    // Get Kryo and verify that the registered IDs and types in
+    // Kryo have not changed compared to the provided registrations
+    // file.
+    val kryo = new KryoSerializer[Integer](classOf[Integer], new ExecutionConfig()).getKryo
+    val nextId = kryo.getNextRegistrationId
+    for (i <- 0 until nextId) {
+      val registration = kryo.getRegistration(i)
+
+      previousRegistrations.get(registration.getId) match {
+        case None => throw new IllegalStateException(s"Expected no entry with ID " +
+          s"${registration.getId}, but got one for type ${registration.getType.getName}. This " +
+          s"can lead to registered user types being deserialized with the wrong serializer when " +
+          s"restoring a savepoint.")
+        case Some(registeredClass) =>
+          if (registeredClass != registration.getType.getName) {
+            throw new IllegalStateException(s"Expected type ${registration.getType.getName} with " +
+              s"ID ${registration.getId}, but got $registeredClass.")
+          }
+      }
+    }
+
+    // Verify number of registrations (required to check if current number of
+    // registrations is less than before).
+    if (previousRegistrations.size != nextId) {
+      throw new IllegalStateException(s"Number of registered classes changed (previously " +
+        s"${previousRegistrations.size}, but now $nextId). This can lead to registered user " +
+        s"types being deserialized with the wrong serializer when restoring a savepoint.")
+    }
+  }
+
+  /**
+    * Creates a Kryo serializer and writes the default registrations out to a
+    * comma separated file with one entry per line:
+    *
+    * id,class
+    *
+    * The produced file is used to check that the registered IDs don't change
+    * in future Flink versions.
+    *
+    * This method is not used in the tests, but documents how the test file
+    * has been created and can be used to re-create it if needed.
+    *
+    * @param filePath File path to write registrations to
+    */
+  private def writeDefaultKryoRegistrations(filePath: String) = {
+    val file = new File(filePath)
+    if (file.exists()) {
+      file.delete()
+    }
+
+    val writer = new BufferedWriter(new FileWriter(file))
+
+    try {
+      val kryo = new KryoSerializer[Integer](classOf[Integer], new ExecutionConfig()).getKryo
+
+      val nextId = kryo.getNextRegistrationId
+      for (i <- 0 until nextId) {
+        val registration = kryo.getRegistration(i)
+        val str = registration.getId + "," + registration.getType.getName
+        writer.write(str, 0, str.length)
+        writer.newLine()
+      }
+
+      println(s"Created file with registrations at $file.")
+    } finally {
+      writer.close()
+    }
+  }
+
+
   case class ComplexType(id: String, number: Int, values: List[Int]){
     override def equals(obj: Any): Boolean ={
       if(obj != null && obj.isInstanceOf[ComplexType]){

http://git-wip-us.apache.org/repos/asf/flink/blob/8fddae8d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6c74d6c..5f27ee5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -881,6 +881,7 @@ under the License.
 
 						<!-- Test Data. -->
 						<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
+						<exclude>flink-tests/src/test/resources/flink_11-kryo_registrations</exclude>
 						<exclude>flink-connectors/flink-avro/src/test/resources/avro/*.avsc</exclude>
 						<exclude>out/test/flink-avro/avro/user.avsc</exclude>
 						<exclude>flink-libraries/flink-table/src/test/scala/resources/*.out</exclude>