You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/07 16:30:02 UTC
[05/12] flink git commit: [FLINK-6844] [scala] Implement
compatibility methods for TraversableSerializer
[FLINK-6844] [scala] Implement compatibility methods for TraversableSerializer
This closes #4081.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c11d5ed5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c11d5ed5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c11d5ed5
Branch: refs/heads/master
Commit: c11d5ed5388a5a30ca4ea0c5ac68e22e5989cb54
Parents: bdffde3
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Jun 5 20:52:57 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jun 7 18:28:58 2017 +0200
----------------------------------------------------------------------
.../scala/typeutils/TraversableSerializer.scala | 41 ++++++++++++++++++--
1 file changed, 38 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c11d5ed5/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 1ac46f9..6299a24 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.typeutils
import java.io.ObjectInputStream
import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot}
+import org.apache.flink.api.common.typeutils._
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import scala.collection.generic.CanBuildFrom
@@ -152,11 +152,46 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
}
override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
- throw new UnsupportedOperationException()
+ new TraversableSerializer.TraversableSerializerConfigSnapshot[E](elementSerializer)
}
override def ensureCompatibility(
configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = {
- throw new UnsupportedOperationException()
+
+ configSnapshot match {
+ case traversableSerializerConfigSnapshot:
+ TraversableSerializer.TraversableSerializerConfigSnapshot[E] =>
+
+ val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
+ traversableSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
+ classOf[UnloadableDummyTypeSerializer[_]],
+ traversableSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
+ elementSerializer)
+
+ if (elemCompatRes.isRequiresMigration) {
+ CompatibilityResult.requiresMigration()
+ } else {
+ CompatibilityResult.compatible()
+ }
+
+ case _ => CompatibilityResult.requiresMigration()
+ }
+ }
+}
+
+object TraversableSerializer {
+
+ class TraversableSerializerConfigSnapshot[E](
+ private var elementSerializer: TypeSerializer[E])
+ extends CompositeTypeSerializerConfigSnapshot(elementSerializer) {
+
+ /** This empty nullary constructor is required for deserializing the configuration. */
+ def this() = this(null)
+
+ override def getVersion = TraversableSerializerConfigSnapshot.VERSION
+ }
+
+ object TraversableSerializerConfigSnapshot {
+ val VERSION = 1
}
}