You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/30 14:54:17 UTC
[flink] 12/18: [FLINK-11329] [scala] Delegate compatibility check
on old OptionSerializerConfigSnapshot to ScalaOptionSerializerSnapshot
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 198ceb27751f95ee7f6f870eb4f700c3b97ff746
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jan 30 15:59:45 2019 +0800
[FLINK-11329] [scala] Delegate compatibility check on old OptionSerializerConfigSnapshot to ScalaOptionSerializerSnapshot
This closes #7590.
---
.../scala/typeutils/ScalaOptionSerializerConfigSnapshot.java | 3 +++
.../api/scala/typeutils/ScalaOptionSerializerSnapshot.java | 3 ++-
.../apache/flink/api/scala/typeutils/OptionSerializer.scala | 10 ++++++++++
3 files changed, 15 insertions(+), 1 deletion(-)
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
index e6bc88c..3e5bd95 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
@@ -32,6 +32,9 @@ import scala.Option;
* <p>This configuration snapshot class is implemented in Java because Scala does not
* allow calling different base class constructors from subclasses, while we need that
* for the default empty constructor.
+ *
+ * @deprecated this snapshot class is no longer in use, and is maintained only for backwards compatibility.
+ * It is fully replaced by {@link ScalaOptionSerializerSnapshot}.
*/
@Deprecated
public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot<Option<E>> {
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
index dfa9178..47a72a2 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
@@ -20,11 +20,12 @@ package org.apache.flink.api.scala.typeutils;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import scala.Option;
/**
- * A {@link org.apache.flink.api.common.typeutils.TypeSerializerSnapshot} for the Scala {@link OptionSerializer}.
+ * A {@link TypeSerializerSnapshot} for the Scala {@link OptionSerializer}.
*/
public final class ScalaOptionSerializerSnapshot<E> extends CompositeTypeSerializerSnapshot<Option<E>, OptionSerializer<E>> {
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index ea8f22a..d41d2e8 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -116,6 +116,16 @@ object OptionSerializer {
extends CompositeTypeSerializerConfigSnapshot[Option[A]] {
override def getVersion: Int = OptionSerializerConfigSnapshot.VERSION
+
+ override def resolveSchemaCompatibility(
+ newSerializer: TypeSerializer[Option[A]]
+ ): TypeSerializerSchemaCompatibility[Option[A]] = {
+ CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+ newSerializer,
+ new ScalaOptionSerializerSnapshot[A](),
+ getSingleNestedSerializerAndConfig.f1
+ )
+ }
}
object OptionSerializerConfigSnapshot {