You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/13 09:49:50 UTC

flink git commit: [hotfix] [tests] Fix failing Scala StatefulJobSavepointMigrationITCase

Repository: flink
Updated Branches:
  refs/heads/master 68ac96e16 -> 3bad77c0a


[hotfix] [tests] Fix failing Scala StatefulJobSavepointMigrationITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3bad77c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3bad77c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3bad77c0

Branch: refs/heads/master
Commit: 3bad77c0ae932a926260b769efb151a89fc309ab
Parents: 68ac96e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Tue Jun 13 11:47:44 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 11:49:34 2017 +0200

----------------------------------------------------------------------
 .../scala/typeutils/EnumValueSerializer.scala   |  31 ++++++++++---------
 .../api/scala/typeutils/TrySerializer.scala     |   2 +-
 .../_metadata                                   | Bin 213855 -> 218895 bytes
 .../_metadata                                   | Bin 213855 -> 218895 bytes
 .../_metadata                                   | Bin 213855 -> 218119 bytes
 .../_metadata                                   | Bin 213855 -> 218119 bytes
 6 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3bad77c0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index 50526f5..119db93 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -78,8 +78,7 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[
   // --------------------------------------------------------------------------------------------
 
   override def snapshotConfiguration(): EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E] = {
-    new EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E](
-      enum.getClass.asInstanceOf[Class[E]])
+    new EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E](enum)
   }
 
   override def ensureCompatibility(
@@ -89,14 +88,16 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[
       case enumSerializerConfigSnapshot: EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[_] =>
         val enumClass = enum.getClass.asInstanceOf[Class[E]]
         if (enumClass.equals(enumSerializerConfigSnapshot.getEnumClass)) {
-          val currentEnumConstants = enumSerializerConfigSnapshot.getEnumClass.getEnumConstants
+          val previousEnumConstants = enumSerializerConfigSnapshot.getEnumConstants
 
-          for ( i <- 0 to currentEnumConstants.length) {
-            // compatible only if new enum constants are only appended,
-            // and original constants must be in the exact same order
+          if (previousEnumConstants != null) {
+            for (i <- enum.values.iterator) {
+              if (!previousEnumConstants(i.id).equals(i.toString)) {
+                // compatible only if new enum constants are only appended,
+                // and original constants must be in the exact same order
 
-            if (currentEnumConstants(i) != enumSerializerConfigSnapshot.getEnumConstants(i)) {
-              return CompatibilityResult.requiresMigration()
+                return CompatibilityResult.requiresMigration()
+              }
             }
           }
 
@@ -116,12 +117,12 @@ object EnumValueSerializer {
       extends TypeSerializerConfigSnapshot {
 
     var enumClass: Class[E] = _
-    var enumConstants: Array[E] = _
+    var enumConstants: List[String] = _
 
-    def this(enumClass: Class[E]) = {
+    def this(enum: E) = {
       this()
-      this.enumClass = Preconditions.checkNotNull(enumClass)
-      this.enumConstants = enumClass.getEnumConstants
+      this.enumClass = Preconditions.checkNotNull(enum).getClass.asInstanceOf[Class[E]]
+      this.enumConstants = enum.values.toList.map(_.toString)
     }
 
     override def write(out: DataOutputView): Unit = {
@@ -160,7 +161,7 @@ object EnumValueSerializer {
 
     def getEnumClass: Class[E] = enumClass
 
-    def getEnumConstants: Array[E] = enumConstants
+    def getEnumConstants: List[String] = enumConstants
 
     override def equals(obj: scala.Any): Boolean = {
       if (obj == this) {
@@ -173,12 +174,12 @@ object EnumValueSerializer {
 
       obj.isInstanceOf[ScalaEnumSerializerConfigSnapshot[E]] &&
         enumClass.equals(obj.asInstanceOf[ScalaEnumSerializerConfigSnapshot[E]].enumClass) &&
-        enumConstants.sameElements(
+        enumConstants.equals(
           obj.asInstanceOf[ScalaEnumSerializerConfigSnapshot[E]].enumConstants)
     }
 
     override def hashCode(): Int = {
-      enumClass.hashCode() * 31 + enumConstants.toSeq.hashCode()
+      enumClass.hashCode() * 31 + enumConstants.hashCode()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3bad77c0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index e128157..cc9c5cc 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -124,7 +124,7 @@ class TrySerializer[A](
 
   private def ensureCompatibility(
       compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot)
-      : CompatibilityResult[Option[A]] = {
+        : CompatibilityResult[Try[A]] = {
 
     val previousSerializersAndConfigs =
       compositeConfigSnapshot.getNestedSerializersAndConfigs

http://git-wip-us.apache.org/repos/asf/flink/blob/3bad77c0/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
index e183e51..f4db7ff 100644
Binary files a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/3bad77c0/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
index 612bc1b..459b89e 100644
Binary files a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/3bad77c0/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
index 6adf433..eb87c0b 100644
Binary files a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/3bad77c0/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
index d9eaa72..6a33832 100644
Binary files a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata differ