You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/07 17:22:49 UTC

[02/12] flink git commit: [FLINK-6844] [scala] Implement compatibility methods for TraversableSerializer

[FLINK-6844] [scala] Implement compatibility methods for TraversableSerializer

This closes #4081.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1e207c8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1e207c8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1e207c8

Branch: refs/heads/release-1.3
Commit: e1e207c898ed436df656d01364cf0e5fa818b730
Parents: 53e69b4
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Jun 5 20:52:57 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jun 7 18:51:51 2017 +0200

----------------------------------------------------------------------
 .../scala/typeutils/TraversableSerializer.scala | 41 ++++++++++++++++++--
 1 file changed, 38 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1e207c8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 1ac46f9..6299a24 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.typeutils
 import java.io.ObjectInputStream
 
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot}
+import org.apache.flink.api.common.typeutils._
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
 import scala.collection.generic.CanBuildFrom
@@ -152,11 +152,46 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
   }
 
   override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
-    throw new UnsupportedOperationException()
+    new TraversableSerializer.TraversableSerializerConfigSnapshot[E](elementSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = {
-    throw new UnsupportedOperationException()
+
+    configSnapshot match {
+      case traversableSerializerConfigSnapshot:
+          TraversableSerializer.TraversableSerializerConfigSnapshot[E] =>
+
+        val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
+          traversableSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
+          classOf[UnloadableDummyTypeSerializer[_]],
+          traversableSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
+          elementSerializer)
+
+        if (elemCompatRes.isRequiresMigration) {
+          CompatibilityResult.requiresMigration()
+        } else {
+          CompatibilityResult.compatible()
+        }
+
+      case _ => CompatibilityResult.requiresMigration()
+    }
+  }
+}
+
+object TraversableSerializer {
+
+  class TraversableSerializerConfigSnapshot[E](
+      private var elementSerializer: TypeSerializer[E])
+    extends CompositeTypeSerializerConfigSnapshot(elementSerializer) {
+
+    /** This empty nullary constructor is required for deserializing the configuration. */
+    def this() = this(null)
+
+    override def getVersion = TraversableSerializerConfigSnapshot.VERSION
+  }
+
+  object TraversableSerializerConfigSnapshot {
+    val VERSION = 1
   }
 }