You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/18 14:27:45 UTC
[2/2] flink git commit: [FLINK-5484] [serialization] Add test for
registered Kryo types
[FLINK-5484] [serialization] Add test for registered Kryo types
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8fddae8d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8fddae8d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8fddae8d
Branch: refs/heads/master
Commit: 8fddae8da689102a35dd7b234c124591f9c79ab6
Parents: 8f4139a
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Jan 17 19:10:33 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Jan 18 15:27:16 2017 +0100
----------------------------------------------------------------------
.../test/resources/flink_11-kryo_registrations | 86 ++++++++++++++++
.../runtime/KryoGenericTypeSerializerTest.scala | 100 ++++++++++++++++++-
pom.xml | 1 +
3 files changed, 183 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8fddae8d/flink-tests/src/test/resources/flink_11-kryo_registrations
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/flink_11-kryo_registrations b/flink-tests/src/test/resources/flink_11-kryo_registrations
new file mode 100644
index 0000000..7000e62
--- /dev/null
+++ b/flink-tests/src/test/resources/flink_11-kryo_registrations
@@ -0,0 +1,86 @@
+0,int
+1,java.lang.String
+2,float
+3,boolean
+4,byte
+5,char
+6,short
+7,long
+8,double
+9,void
+10,scala.collection.convert.Wrappers$SeqWrapper
+11,scala.collection.convert.Wrappers$IteratorWrapper
+12,scala.collection.convert.Wrappers$MapWrapper
+13,scala.collection.convert.Wrappers$JListWrapper
+14,scala.collection.convert.Wrappers$JMapWrapper
+15,scala.Some
+16,scala.util.Left
+17,scala.util.Right
+18,scala.collection.immutable.Vector
+19,scala.collection.immutable.Set$Set1
+20,scala.collection.immutable.Set$Set2
+21,scala.collection.immutable.Set$Set3
+22,scala.collection.immutable.Set$Set4
+23,scala.collection.immutable.HashSet$HashTrieSet
+24,scala.collection.immutable.Map$Map1
+25,scala.collection.immutable.Map$Map2
+26,scala.collection.immutable.Map$Map3
+27,scala.collection.immutable.Map$Map4
+28,scala.collection.immutable.HashMap$HashTrieMap
+29,scala.collection.immutable.Range$Inclusive
+30,scala.collection.immutable.NumericRange$Inclusive
+31,scala.collection.immutable.NumericRange$Exclusive
+32,scala.collection.mutable.BitSet
+33,scala.collection.mutable.HashMap
+34,scala.collection.mutable.HashSet
+35,scala.collection.convert.Wrappers$IterableWrapper
+36,scala.Tuple1
+37,scala.Tuple2
+38,scala.Tuple3
+39,scala.Tuple4
+40,scala.Tuple5
+41,scala.Tuple6
+42,scala.Tuple7
+43,scala.Tuple8
+44,scala.Tuple9
+45,scala.Tuple10
+46,scala.Tuple11
+47,scala.Tuple12
+48,scala.Tuple13
+49,scala.Tuple14
+50,scala.Tuple15
+51,scala.Tuple16
+52,scala.Tuple17
+53,scala.Tuple18
+54,scala.Tuple19
+55,scala.Tuple20
+56,scala.Tuple21
+57,scala.Tuple22
+58,scala.Tuple1$mcJ$sp
+59,scala.Tuple1$mcI$sp
+60,scala.Tuple1$mcD$sp
+61,scala.Tuple2$mcJJ$sp
+62,scala.Tuple2$mcJI$sp
+63,scala.Tuple2$mcJD$sp
+64,scala.Tuple2$mcIJ$sp
+65,scala.Tuple2$mcII$sp
+66,scala.Tuple2$mcID$sp
+67,scala.Tuple2$mcDJ$sp
+68,scala.Tuple2$mcDI$sp
+69,scala.Tuple2$mcDD$sp
+70,scala.Symbol
+71,scala.reflect.ClassTag
+72,scala.runtime.BoxedUnit
+73,java.util.Arrays$ArrayList
+74,java.util.BitSet
+75,java.util.PriorityQueue
+76,java.util.regex.Pattern
+77,java.sql.Date
+78,java.sql.Time
+79,java.sql.Timestamp
+80,java.net.URI
+81,java.net.InetSocketAddress
+82,java.util.UUID
+83,java.util.Locale
+84,java.text.SimpleDateFormat
+85,org.apache.avro.generic.GenericData$Array
http://git-wip-us.apache.org/repos/asf/flink/blob/8fddae8d/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
index 08a0a96..e001799 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala
@@ -17,17 +17,19 @@
*/
package org.apache.flink.api.scala.runtime
-import com.esotericsoftware.kryo.{Kryo, Serializer}
-import com.esotericsoftware.kryo.io.{Input, Output}
+import java.io._
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.esotericsoftware.kryo.{Kryo, Serializer}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeutils.SerializerTestInstance
import org.apache.flink.api.java.typeutils.GenericTypeInfo
-
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.joda.time.LocalDate
-
import org.junit.Test
+import scala.collection.mutable
+import scala.io.Source
import scala.reflect._
class KryoGenericTypeSerializerTest {
@@ -146,6 +148,96 @@ class KryoGenericTypeSerializerTest {
runTests(list)
}
+ /**
+ * Tests that the registered classes in Kryo did not change.
+ *
+ * Once we have proper serializer versioning this test will become obsolete.
+ * But currently a change in the serializers can break savepoint backwards
+ * compatability between Flink versions.
+ */
+ @Test
+ def testDefaultKryoRegisteredClassesDidNotChange(): Unit = {
+ // Previous registration (id => registered class (Class#getName))
+ val previousRegistrations: mutable.HashMap[Int, String] = mutable.HashMap[Int, String]()
+
+ val stream = Thread.currentThread().getContextClassLoader()
+ .getResourceAsStream("flink_11-kryo_registrations")
+ Source.fromInputStream(stream).getLines().foreach{
+ line =>
+ val Array(id, registeredClass) = line.split(",")
+ previousRegistrations.put(id.toInt, registeredClass)
+ }
+
+ // Get Kryo and verify that the registered IDs and types in
+ // Kryo have not changed compared to the provided registrations
+ // file.
+ val kryo = new KryoSerializer[Integer](classOf[Integer], new ExecutionConfig()).getKryo
+ val nextId = kryo.getNextRegistrationId
+ for (i <- 0 until nextId) {
+ val registration = kryo.getRegistration(i)
+
+ previousRegistrations.get(registration.getId) match {
+ case None => throw new IllegalStateException(s"Expected no entry with ID " +
+ s"${registration.getId}, but got one for type ${registration.getType.getName}. This " +
+ s"can lead to registered user types being deserialized with the wrong serializer when " +
+ s"restoring a savepoint.")
+ case Some(registeredClass) =>
+ if (registeredClass != registration.getType.getName) {
+ throw new IllegalStateException(s"Expected type ${registration.getType.getName} with " +
+ s"ID ${registration.getId}, but got $registeredClass.")
+ }
+ }
+ }
+
+ // Verify number of registrations (required to check if current number of
+ // registrations is less than before).
+ if (previousRegistrations.size != nextId) {
+ throw new IllegalStateException(s"Number of registered classes changed (previously " +
+ s"${previousRegistrations.size}, but now $nextId). This can lead to registered user " +
+ s"types being deserialized with the wrong serializer when restoring a savepoint.")
+ }
+ }
+
+ /**
+ * Creates a Kryo serializer and writes the default registrations out to a
+ * comma separated file with one entry per line:
+ *
+ * id,class
+ *
+ * The produced file is used to check that the registered IDs don't change
+ * in future Flink versions.
+ *
+ * This method is not used in the tests, but documents how the test file
+ * has been created and can be used to re-create it if needed.
+ *
+ * @param filePath File path to write registrations to
+ */
+ private def writeDefaultKryoRegistrations(filePath: String) = {
+ val file = new File(filePath)
+ if (file.exists()) {
+ file.delete()
+ }
+
+ val writer = new BufferedWriter(new FileWriter(file))
+
+ try {
+ val kryo = new KryoSerializer[Integer](classOf[Integer], new ExecutionConfig()).getKryo
+
+ val nextId = kryo.getNextRegistrationId
+ for (i <- 0 until nextId) {
+ val registration = kryo.getRegistration(i)
+ val str = registration.getId + "," + registration.getType.getName
+ writer.write(str, 0, str.length)
+ writer.newLine()
+ }
+
+ println(s"Created file with registrations at $file.")
+ } finally {
+ writer.close()
+ }
+ }
+
+
case class ComplexType(id: String, number: Int, values: List[Int]){
override def equals(obj: Any): Boolean ={
if(obj != null && obj.isInstanceOf[ComplexType]){
http://git-wip-us.apache.org/repos/asf/flink/blob/8fddae8d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6c74d6c..5f27ee5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -881,6 +881,7 @@ under the License.
<!-- Test Data. -->
<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
+ <exclude>flink-tests/src/test/resources/flink_11-kryo_registrations</exclude>
<exclude>flink-connectors/flink-avro/src/test/resources/avro/*.avsc</exclude>
<exclude>out/test/flink-avro/avro/user.avsc</exclude>
<exclude>flink-libraries/flink-table/src/test/scala/resources/*.out</exclude>