You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/20 12:38:22 UTC

[2/3] flink git commit: [FLINK-6948] [serializer] Harden EnumValueSerializer to detect changed enum indices

[FLINK-6948] [serializer] Harden EnumValueSerializer to detect changed enum indices

This PR changes the seriailization format of the ScalaEnumSerializerConfigSnapshot to also include the
ordinal value of an enum value when being deserialized. This allows to detect if the ordinal values
have been changed and, thus, if migration is required.

IMPORTANT: This PR changes the serialization format of ScalaEnumSerializerConfigSnapshot.

Remove backwards compatibility path for 1.3.1

This closes #4142.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/228faf8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/228faf8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/228faf8c

Branch: refs/heads/master
Commit: 228faf8c2a113de0dea366668e87484dbe7ec8b5
Parents: e520023
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jun 19 12:49:05 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 20 20:37:13 2017 +0800

----------------------------------------------------------------------
 .../scala/typeutils/EnumValueSerializer.scala   |  65 ++++++++++++++-----
 .../src/test/resources/log4j-test.properties    |  31 +++++++++
 .../EnumValueSerializerUpgradeTest.scala        |  20 ++++++
 .../_metadata                                   | Bin 218895 -> 217183 bytes
 .../_metadata                                   | Bin 218895 -> 217183 bytes
 .../_metadata                                   | Bin 218119 -> 217183 bytes
 .../_metadata                                   | Bin 218119 -> 217183 bytes
 7 files changed, 99 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/228faf8c/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index 8be5e0b..344b56d 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -26,6 +26,8 @@ import org.apache.flink.api.java.typeutils.runtime.{DataInputViewStream, DataOut
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 import org.apache.flink.util.{InstantiationUtil, Preconditions}
 
+import scala.collection.mutable.ListBuffer
+
 /**
  * Serializer for [[Enumeration]] values.
  */
@@ -88,18 +90,23 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[
       case enumSerializerConfigSnapshot: EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[_] =>
         val enumClass = enum.getClass.asInstanceOf[Class[E]]
         if (enumClass.equals(enumSerializerConfigSnapshot.getEnumClass)) {
-          val previousEnumConstants = enumSerializerConfigSnapshot.getEnumConstants
+          val previousEnumConstants:List[(String, Int)] =
+            enumSerializerConfigSnapshot.getEnumConstants
 
           if (previousEnumConstants != null) {
-            for (i <- enum.values.iterator) {
-              // skip the check for all newly added fields
-              if (i.id < previousEnumConstants.length) {
-                if (!previousEnumConstants(i.id).equals(i.toString)) {
-                  // compatible only if new enum constants are only appended,
-                  // and original constants must be in the exact same order
-
+            for ((previousEnumConstant, idx) <- previousEnumConstants) {
+              val enumValue = try {
+                enum(idx)
+              } catch {
+                case _: NoSuchElementException =>
+                  // couldn't find an enum value for the given index
                   return CompatibilityResult.requiresMigration()
-                }
+              }
+
+              if (!previousEnumConstant.equals(enumValue.toString)) {
+                // compatible only if new enum constants are only appended,
+                // and original constants must be in the exact same order
+                return CompatibilityResult.requiresMigration()
               }
             }
           }
@@ -120,12 +127,12 @@ object EnumValueSerializer {
       extends TypeSerializerConfigSnapshot {
 
     var enumClass: Class[E] = _
-    var enumConstants: List[String] = _
+    var enumConstants: List[(String, Int)] = _
 
     def this(enum: E) = {
       this()
       this.enumClass = Preconditions.checkNotNull(enum).getClass.asInstanceOf[Class[E]]
-      this.enumConstants = enum.values.toList.map(_.toString)
+      this.enumConstants = enum.values.toList.map(x => (x.toString, x.id))
     }
 
     override def write(out: DataOutputView): Unit = {
@@ -135,7 +142,12 @@ object EnumValueSerializer {
         val outViewWrapper = new DataOutputViewStream(out)
         try {
           InstantiationUtil.serializeObject(outViewWrapper, enumClass)
-          InstantiationUtil.serializeObject(outViewWrapper, enumConstants)
+
+          out.writeInt(enumConstants.length)
+          for ((name, idx) <- enumConstants) {
+            out.writeUTF(name)
+            out.writeInt(idx)
+          }
         } finally if (outViewWrapper != null) outViewWrapper.close()
       }
     }
@@ -150,8 +162,24 @@ object EnumValueSerializer {
             enumClass = InstantiationUtil.deserializeObject(
               inViewWrapper, getUserCodeClassLoader)
 
-            enumConstants = InstantiationUtil.deserializeObject(
-              inViewWrapper, getUserCodeClassLoader)
+            if (getReadVersion == 1) {
+              // read null from input stream
+              InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader)
+              enumConstants = List()
+            } else if (getReadVersion == 2) {
+              val length = in.readInt()
+              val listBuffer = ListBuffer[(String, Int)]()
+
+              for (_ <- 0 until length) {
+                val name = in.readUTF()
+                val idx = in.readInt()
+                listBuffer += ((name, idx))
+              }
+
+              enumConstants = listBuffer.toList
+            } else {
+              throw new IOException(s"Cannot deserialize ${getClass.getSimpleName} with version $getReadVersion.")
+            }
           } catch {
             case e: ClassNotFoundException =>
               throw new IOException("The requested enum class cannot be found in classpath.", e)
@@ -164,7 +192,7 @@ object EnumValueSerializer {
 
     def getEnumClass: Class[E] = enumClass
 
-    def getEnumConstants: List[String] = enumConstants
+    def getEnumConstants: List[(String, Int)] = enumConstants
 
     override def equals(obj: scala.Any): Boolean = {
       if (obj == this) {
@@ -184,10 +212,13 @@ object EnumValueSerializer {
     override def hashCode(): Int = {
       enumClass.hashCode() * 31 + enumConstants.hashCode()
     }
+
+    override def getCompatibleVersions: Array[Int] = {
+      Array(1, 2)
+    }
   }
 
   object ScalaEnumSerializerConfigSnapshot {
-    val VERSION = 1
+    val VERSION = 2
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/228faf8c/flink-scala/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/resources/log4j-test.properties b/flink-scala/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..10792cd
--- /dev/null
+++ b/flink-scala/src/test/resources/log4j-test.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
+log4j.logger.org.apache.zookeeper=OFF

http://git-wip-us.apache.org/repos/asf/flink/blob/228faf8c/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
index c470cd0..af725f6 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
@@ -74,6 +74,16 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
        |}
     """.stripMargin
 
+  val enumE =
+    s"""
+       |@SerialVersionUID(1L)
+       |object $enumName extends Enumeration {
+       |  val A = Value(42)
+       |  val B = Value(5)
+       |  val C = Value(1337)
+       |}
+    """.stripMargin
+
   /**
     * Check that identical enums don't require migration
     */
@@ -106,6 +116,16 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
     assertTrue(checkCompatibility(enumA, enumD).isRequiresMigration)
   }
 
+  /**
+    * Check that changing the enum ids causes a migration
+    */
+  @Test
+  def checkDifferentIds(): Unit = {
+    assertTrue(
+      "Different ids should cause a migration.",
+      checkCompatibility(enumA, enumE).isRequiresMigration)
+  }
+
   def checkCompatibility(enumSourceA: String, enumSourceB: String)
     : CompatibilityResult[Enumeration#Value] = {
     import EnumValueSerializerUpgradeTest._

http://git-wip-us.apache.org/repos/asf/flink/blob/228faf8c/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
index f4db7ff..c18fd09 100644
Binary files a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/228faf8c/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
index 459b89e..301725b 100644
Binary files a/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata and b/flink-tests/src/test/resources/stateful-scala2.10-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/228faf8c/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata
index eb87c0b..e154243 100644
Binary files a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-jobmanager-savepoint/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/228faf8c/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata
index 6a33832..08aa333 100644
Binary files a/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata and b/flink-tests/src/test/resources/stateful-scala2.11-udf-migration-itcase-flink1.3-rocksdb-savepoint/_metadata differ