You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/30 14:54:16 UTC

[flink] 11/18: [FLINK-11329] [scala] Migrating ScalaOptionSerializer to use new compatibility API

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a34d46344dccfb7ba34e00337e7c0270cf09e416
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Mon Jan 28 16:56:50 2019 +0100

    [FLINK-11329] [scala] Migrating ScalaOptionSerializer to use new compatibility API
---
 .../ScalaOptionSerializerConfigSnapshot.java       |  11 ++++
 .../typeutils/ScalaOptionSerializerSnapshot.java   |  62 +++++++++++++++++++++
 .../api/scala/typeutils/OptionSerializer.scala     |  41 +-------------
 ...ScalaOptionSerializerSnapshotMigrationTest.java |  58 +++++++++++++++++++
 .../flink-1.6-scala-option-serializer-data         | Bin 0 -> 31 bytes
 .../flink-1.6-scala-option-serializer-snapshot     | Bin 0 -> 876 bytes
 .../flink-1.7-scala-option-serializer-data         | Bin 0 -> 31 bytes
 .../flink-1.7-scala-option-serializer-snapshot     | Bin 0 -> 877 bytes
 8 files changed, 133 insertions(+), 39 deletions(-)

diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
index 215bd44..e6bc88c 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
@@ -19,8 +19,10 @@
 package org.apache.flink.api.scala.typeutils;
 
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 
 import scala.Option;
 
@@ -31,6 +33,7 @@ import scala.Option;
  * allow calling different base class constructors from subclasses, while we need that
  * for the default empty constructor.
  */
+@Deprecated
 public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot<Option<E>> {
 
 	private static final int VERSION = 1;
@@ -46,4 +49,12 @@ public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeS
 	public int getVersion() {
 		return VERSION;
 	}
+
+	@Override
+	public TypeSerializerSchemaCompatibility<Option<E>> resolveSchemaCompatibility(TypeSerializer<Option<E>> newSerializer) {
+		return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+			newSerializer,
+			new ScalaOptionSerializerSnapshot<>(),
+			getSingleNestedSerializerAndConfig().f1);
+	}
 }
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
new file mode 100644
index 0000000..dfa9178
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import scala.Option;
+
+/**
+ * A {@link org.apache.flink.api.common.typeutils.TypeSerializerSnapshot} for the Scala {@link OptionSerializer}.
+ */
+public final class ScalaOptionSerializerSnapshot<E> extends CompositeTypeSerializerSnapshot<Option<E>, OptionSerializer<E>> {
+
+	private static final int VERSION = 2;
+
+	@SuppressWarnings("WeakerAccess")
+	public ScalaOptionSerializerSnapshot() {
+		super(underlyingClass());
+	}
+
+	public ScalaOptionSerializerSnapshot(OptionSerializer<E> serializerInstance) {
+		super(serializerInstance);
+	}
+
+	@Override
+	protected int getCurrentOuterSnapshotVersion() {
+		return VERSION;
+	}
+
+	@Override
+	protected TypeSerializer<?>[] getNestedSerializers(OptionSerializer<E> outerSerializer) {
+		return new TypeSerializer[]{outerSerializer.elemSerializer()};
+	}
+
+	@Override
+	protected OptionSerializer<E> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked") TypeSerializer<E> nestedSerializer = (TypeSerializer<E>) nestedSerializers[0];
+		return new OptionSerializer<>(nestedSerializer);
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <E> Class<OptionSerializer<E>> underlyingClass() {
+		return (Class<OptionSerializer<E>>) (Class<?>) OptionSerializer.class;
+	}
+}
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 7f3aa8c..ea8f22a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -101,45 +101,8 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
   // Serializer configuration snapshotting & compatibility
   // --------------------------------------------------------------------------------------------
 
-  override def snapshotConfiguration(): ScalaOptionSerializerConfigSnapshot[A] = {
-    new ScalaOptionSerializerConfigSnapshot[A](elemSerializer)
-  }
-
-  override def ensureCompatibility(
-      configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Option[A]] = {
-
-    configSnapshot match {
-      case optionSerializerConfigSnapshot
-          : ScalaOptionSerializerConfigSnapshot[A] =>
-        ensureCompatibilityInternal(optionSerializerConfigSnapshot)
-      case legacyOptionSerializerConfigSnapshot
-          : OptionSerializer.OptionSerializerConfigSnapshot[A] =>
-        ensureCompatibilityInternal(legacyOptionSerializerConfigSnapshot)
-      case _ => CompatibilityResult.requiresMigration()
-    }
-  }
-
-  private def ensureCompatibilityInternal(
-      compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot[Option[A]])
-      : CompatibilityResult[Option[A]] = {
-
-    val compatResult = CompatibilityUtil.resolveCompatibilityResult(
-      compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
-      classOf[UnloadableDummyTypeSerializer[_]],
-      compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
-      elemSerializer)
-
-    if (compatResult.isRequiresMigration) {
-      if (compatResult.getConvertDeserializer != null) {
-        CompatibilityResult.requiresMigration(
-          new OptionSerializer[A](
-            new TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
-      } else {
-        CompatibilityResult.requiresMigration()
-      }
-    } else {
-      CompatibilityResult.compatible()
-    }
+  override def snapshotConfiguration(): TypeSerializerSnapshot[Option[A]] = {
+    new ScalaOptionSerializerSnapshot[A](this)
   }
 }
 
diff --git a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java
new file mode 100644
index 0000000..efc94ec
--- /dev/null
+++ b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+import scala.Option;
+
+/**
+ * Migration test for the {@link ScalaEitherSerializerSnapshot}.
+ */
+@RunWith(Parameterized.class)
+public class ScalaOptionSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Option<String>> {
+
+	private static final String SPEC_NAME = "scala-option-serializer";
+
+	public ScalaOptionSerializerSnapshotMigrationTest(TestSpecification<Option<String>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			SPEC_NAME,
+			OptionSerializer.class,
+			ScalaOptionSerializerSnapshot.class,
+			() -> new OptionSerializer<>(StringSerializer.INSTANCE));
+
+		return testSpecifications.get();
+	}
+}
diff --git a/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data
new file mode 100644
index 0000000..3cdb252
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data differ
diff --git a/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot
new file mode 100644
index 0000000..d7be0c2
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot differ
diff --git a/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data
new file mode 100644
index 0000000..3cdb252
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data differ
diff --git a/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot
new file mode 100644
index 0000000..bdedf0e
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot differ