You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/30 14:54:17 UTC

[flink] 12/18: [FLINK-11329] [scala] Delegate compatibility check on old OptionSerializerConfigSnapshot to ScalaOptionSerializerSnapshot

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 198ceb27751f95ee7f6f870eb4f700c3b97ff746
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jan 30 15:59:45 2019 +0800

    [FLINK-11329] [scala] Delegate compatibility check on old OptionSerializerConfigSnapshot to ScalaOptionSerializerSnapshot
    
    This closes #7590.
---
 .../scala/typeutils/ScalaOptionSerializerConfigSnapshot.java   |  3 +++
 .../api/scala/typeutils/ScalaOptionSerializerSnapshot.java     |  3 ++-
 .../apache/flink/api/scala/typeutils/OptionSerializer.scala    | 10 ++++++++++
 3 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
index e6bc88c..3e5bd95 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
@@ -32,6 +32,9 @@ import scala.Option;
  * <p>This configuration snapshot class is implemented in Java because Scala does not
  * allow calling different base class constructors from subclasses, while we need that
  * for the default empty constructor.
+ *
+ * @deprecated this snapshot class is no longer in use, and is maintained only for backwards compatibility.
+ *             It is fully replaced by {@link ScalaOptionSerializerSnapshot}.
  */
 @Deprecated
 public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot<Option<E>> {
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
index dfa9178..47a72a2 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
@@ -20,11 +20,12 @@ package org.apache.flink.api.scala.typeutils;
 
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 
 import scala.Option;
 
 /**
- * A {@link org.apache.flink.api.common.typeutils.TypeSerializerSnapshot} for the Scala {@link OptionSerializer}.
+ * A {@link TypeSerializerSnapshot} for the Scala {@link OptionSerializer}.
  */
 public final class ScalaOptionSerializerSnapshot<E> extends CompositeTypeSerializerSnapshot<Option<E>, OptionSerializer<E>> {
 
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index ea8f22a..d41d2e8 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -116,6 +116,16 @@ object OptionSerializer {
       extends CompositeTypeSerializerConfigSnapshot[Option[A]] {
 
     override def getVersion: Int = OptionSerializerConfigSnapshot.VERSION
+
+    override def resolveSchemaCompatibility(
+        newSerializer: TypeSerializer[Option[A]]
+    ): TypeSerializerSchemaCompatibility[Option[A]] = {
+      CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+        newSerializer,
+        new ScalaOptionSerializerSnapshot[A](),
+        getSingleNestedSerializerAndConfig.f1
+      )
+    }
   }
 
   object OptionSerializerConfigSnapshot {