You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/30 14:54:16 UTC
[flink] 11/18: [FLINK-11329] [scala] Migrating
ScalaOptionSerializer to use new compatibility API
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a34d46344dccfb7ba34e00337e7c0270cf09e416
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Mon Jan 28 16:56:50 2019 +0100
[FLINK-11329] [scala] Migrating ScalaOptionSerializer to use new compatibility API
---
.../ScalaOptionSerializerConfigSnapshot.java | 11 ++++
.../typeutils/ScalaOptionSerializerSnapshot.java | 62 +++++++++++++++++++++
.../api/scala/typeutils/OptionSerializer.scala | 41 +-------------
...ScalaOptionSerializerSnapshotMigrationTest.java | 58 +++++++++++++++++++
.../flink-1.6-scala-option-serializer-data | Bin 0 -> 31 bytes
.../flink-1.6-scala-option-serializer-snapshot | Bin 0 -> 876 bytes
.../flink-1.7-scala-option-serializer-data | Bin 0 -> 31 bytes
.../flink-1.7-scala-option-serializer-snapshot | Bin 0 -> 877 bytes
8 files changed, 133 insertions(+), 39 deletions(-)
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
index 215bd44..e6bc88c 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
@@ -19,8 +19,10 @@
package org.apache.flink.api.scala.typeutils;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import scala.Option;
@@ -31,6 +33,7 @@ import scala.Option;
* allow calling different base class constructors from subclasses, while we need that
* for the default empty constructor.
*/
+@Deprecated
public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot<Option<E>> {
private static final int VERSION = 1;
@@ -46,4 +49,12 @@ public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeS
public int getVersion() {
return VERSION;
}
+
+ @Override
+ public TypeSerializerSchemaCompatibility<Option<E>> resolveSchemaCompatibility(TypeSerializer<Option<E>> newSerializer) {
+ return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+ newSerializer,
+ new ScalaOptionSerializerSnapshot<>(),
+ getSingleNestedSerializerAndConfig().f1);
+ }
}
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
new file mode 100644
index 0000000..dfa9178
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import scala.Option;
+
+/**
+ * A {@link org.apache.flink.api.common.typeutils.TypeSerializerSnapshot} for the Scala {@link OptionSerializer}.
+ */
+public final class ScalaOptionSerializerSnapshot<E> extends CompositeTypeSerializerSnapshot<Option<E>, OptionSerializer<E>> {
+
+ private static final int VERSION = 2;
+
+ @SuppressWarnings("WeakerAccess")
+ public ScalaOptionSerializerSnapshot() {
+ super(underlyingClass());
+ }
+
+ public ScalaOptionSerializerSnapshot(OptionSerializer<E> serializerInstance) {
+ super(serializerInstance);
+ }
+
+ @Override
+ protected int getCurrentOuterSnapshotVersion() {
+ return VERSION;
+ }
+
+ @Override
+ protected TypeSerializer<?>[] getNestedSerializers(OptionSerializer<E> outerSerializer) {
+ return new TypeSerializer[]{outerSerializer.elemSerializer()};
+ }
+
+ @Override
+ protected OptionSerializer<E> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+ @SuppressWarnings("unchecked") TypeSerializer<E> nestedSerializer = (TypeSerializer<E>) nestedSerializers[0];
+ return new OptionSerializer<>(nestedSerializer);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <E> Class<OptionSerializer<E>> underlyingClass() {
+ return (Class<OptionSerializer<E>>) (Class<?>) OptionSerializer.class;
+ }
+}
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 7f3aa8c..ea8f22a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -101,45 +101,8 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
// Serializer configuration snapshotting & compatibility
// --------------------------------------------------------------------------------------------
- override def snapshotConfiguration(): ScalaOptionSerializerConfigSnapshot[A] = {
- new ScalaOptionSerializerConfigSnapshot[A](elemSerializer)
- }
-
- override def ensureCompatibility(
- configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Option[A]] = {
-
- configSnapshot match {
- case optionSerializerConfigSnapshot
- : ScalaOptionSerializerConfigSnapshot[A] =>
- ensureCompatibilityInternal(optionSerializerConfigSnapshot)
- case legacyOptionSerializerConfigSnapshot
- : OptionSerializer.OptionSerializerConfigSnapshot[A] =>
- ensureCompatibilityInternal(legacyOptionSerializerConfigSnapshot)
- case _ => CompatibilityResult.requiresMigration()
- }
- }
-
- private def ensureCompatibilityInternal(
- compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot[Option[A]])
- : CompatibilityResult[Option[A]] = {
-
- val compatResult = CompatibilityUtil.resolveCompatibilityResult(
- compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
- classOf[UnloadableDummyTypeSerializer[_]],
- compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
- elemSerializer)
-
- if (compatResult.isRequiresMigration) {
- if (compatResult.getConvertDeserializer != null) {
- CompatibilityResult.requiresMigration(
- new OptionSerializer[A](
- new TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
- } else {
- CompatibilityResult.requiresMigration()
- }
- } else {
- CompatibilityResult.compatible()
- }
+ override def snapshotConfiguration(): TypeSerializerSnapshot[Option[A]] = {
+ new ScalaOptionSerializerSnapshot[A](this)
}
}
diff --git a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java
new file mode 100644
index 0000000..efc94ec
--- /dev/null
+++ b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+import scala.Option;
+
+/**
+ * Migration test for the {@link ScalaEitherSerializerSnapshot}.
+ */
+@RunWith(Parameterized.class)
+public class ScalaOptionSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Option<String>> {
+
+ private static final String SPEC_NAME = "scala-option-serializer";
+
+ public ScalaOptionSerializerSnapshotMigrationTest(TestSpecification<Option<String>> testSpecification) {
+ super(testSpecification);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Parameterized.Parameters(name = "Test Specification = {0}")
+ public static Collection<TestSpecification<?>> testSpecifications() {
+
+ final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+ testSpecifications.add(
+ SPEC_NAME,
+ OptionSerializer.class,
+ ScalaOptionSerializerSnapshot.class,
+ () -> new OptionSerializer<>(StringSerializer.INSTANCE));
+
+ return testSpecifications.get();
+ }
+}
diff --git a/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data
new file mode 100644
index 0000000..3cdb252
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data differ
diff --git a/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot
new file mode 100644
index 0000000..d7be0c2
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot differ
diff --git a/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data
new file mode 100644
index 0000000..3cdb252
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data differ
diff --git a/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot
new file mode 100644
index 0000000..bdedf0e
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot differ