You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/30 14:54:13 UTC

[flink] 08/18: [FLINK-11329] [DataStream] Migrating the UnionSerializer to use new compatibility API

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 392f2cd57380f9070414f4d6d120e4b051d4e061
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Sun Jan 27 16:40:40 2019 +0100

    [FLINK-11329] [DataStream] Migrating the UnionSerializer to use new compatibility API
---
 .../streaming/api/datastream/CoGroupedStreams.java | 110 +++++++++++++--------
 .../datastream/UnionSerializerMigrationTest.java   |  67 +++++++++++++
 .../test/resources/flink-1.6-union-serializer-data |   1 +
 .../resources/flink-1.6-union-serializer-snapshot  | Bin 0 -> 1393 bytes
 .../test/resources/flink-1.7-union-serializer-data |   1 +
 .../resources/flink-1.7-union-serializer-snapshot  | Bin 0 -> 1403 bytes
 6 files changed, 138 insertions(+), 41 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 19d9783..3ef1861 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -25,14 +25,13 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -504,7 +503,9 @@ public class CoGroupedStreams<T1, T2> {
 		}
 	}
 
-	private static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> {
+	@VisibleForTesting
+	@Internal
+	static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> {
 		private static final long serialVersionUID = 1L;
 
 		private final TypeSerializer<T1> oneSerializer;
@@ -618,63 +619,90 @@ public class CoGroupedStreams<T1, T2> {
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot<TaggedUnion<T1, T2>> snapshotConfiguration() {
-			return new UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer);
-		}
-
-		@Override
-		public CompatibilityResult<TaggedUnion<T1, T2>> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-			if (configSnapshot instanceof UnionSerializerConfigSnapshot) {
-				List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousSerializersAndConfigs =
-					((UnionSerializerConfigSnapshot<?, ?>) configSnapshot).getNestedSerializersAndConfigs();
-
-				CompatibilityResult<T1> oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-					previousSerializersAndConfigs.get(0).f0,
-					UnloadableDummyTypeSerializer.class,
-					previousSerializersAndConfigs.get(0).f1,
-					oneSerializer);
-
-				CompatibilityResult<T2> twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-					previousSerializersAndConfigs.get(1).f0,
-					UnloadableDummyTypeSerializer.class,
-					previousSerializersAndConfigs.get(1).f1,
-					twoSerializer);
-
-				if (!oneSerializerCompatResult.isRequiresMigration() && !twoSerializerCompatResult.isRequiresMigration()) {
-					return CompatibilityResult.compatible();
-				} else if (oneSerializerCompatResult.getConvertDeserializer() != null && twoSerializerCompatResult.getConvertDeserializer() != null) {
-					return CompatibilityResult.requiresMigration(
-						new UnionSerializer<>(
-							new TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()),
-							new TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer())));
-				}
-			}
-
-			return CompatibilityResult.requiresMigration();
+		public TypeSerializerSnapshot<TaggedUnion<T1, T2>> snapshotConfiguration() {
+			return new UnionSerializerSnapshot<>(this);
 		}
 	}
 
 	/**
 	 * The {@link TypeSerializerConfigSnapshot} for the {@link UnionSerializer}.
+	 *
+	 * @deprecated this snapshot class is no longer in use, and is maintained only for backwards compatibility.
+	 *             It is fully replaced by {@link UnionSerializerSnapshot}.
 	 */
+	@Deprecated
 	public static class UnionSerializerConfigSnapshot<T1, T2>
-			extends CompositeTypeSerializerConfigSnapshot<TaggedUnion<T1, T2>> {
+		extends CompositeTypeSerializerConfigSnapshot<TaggedUnion<T1, T2>> {
 
 		private static final int VERSION = 1;
 
-		/** This empty nullary constructor is required for deserializing the configuration. */
-		public UnionSerializerConfigSnapshot() {}
+		/**
+		 * This empty nullary constructor is required for deserializing the configuration.
+		 */
+		public UnionSerializerConfigSnapshot() {
+		}
 
 		public UnionSerializerConfigSnapshot(TypeSerializer<T1> oneSerializer, TypeSerializer<T2> twoSerializer) {
 			super(oneSerializer, twoSerializer);
 		}
 
 		@Override
+		public TypeSerializerSchemaCompatibility<TaggedUnion<T1, T2>> resolveSchemaCompatibility(TypeSerializer<TaggedUnion<T1, T2>> newSerializer) {
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs();
+
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new UnionSerializerSnapshot<>(),
+				nestedSerializersAndConfigs.get(0).f1,
+				nestedSerializersAndConfigs.get(1).f1
+			);
+		}
+
+		@Override
 		public int getVersion() {
 			return VERSION;
 		}
 	}
 
+	/**
+	 * The {@link TypeSerializerSnapshot} for the {@link UnionSerializer}.
+	 */
+	public static class UnionSerializerSnapshot<T1, T2>
+		extends CompositeTypeSerializerSnapshot<TaggedUnion<T1, T2>, UnionSerializer<T1, T2>> {
+
+		private static final int VERSION = 2;
+
+		@SuppressWarnings("WeakerAccess")
+		public UnionSerializerSnapshot() {
+			super(correspondingSerializerClass());
+		}
+
+		UnionSerializerSnapshot(UnionSerializer<T1, T2> serializerInstance) {
+			super(serializerInstance);
+		}
+
+		@Override
+		protected int getCurrentOuterSnapshotVersion() {
+			return VERSION;
+		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(UnionSerializer<T1, T2> outerSerializer) {
+			return new TypeSerializer[]{outerSerializer.oneSerializer, outerSerializer.twoSerializer};
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		protected UnionSerializer<T1, T2> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			return new UnionSerializer<>((TypeSerializer<T1>) nestedSerializers[0], (TypeSerializer<T2>) nestedSerializers[1]);
+		}
+
+		@SuppressWarnings("unchecked")
+		private static <T1, T2> Class<UnionSerializer<T1, T2>> correspondingSerializerClass() {
+			return (Class<UnionSerializer<T1, T2>>) (Class<?>) UnionSerializer.class;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utility functions that implement the CoGroup logic based on the tagged
 	//  union window reduce
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java
new file mode 100644
index 0000000..2b85d7e
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializer;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * State migration tests for {@link UnionSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class UnionSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<TaggedUnion<String, Long>> {
+
+	public UnionSerializerMigrationTest(TestSpecification<TaggedUnion<String, Long>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"union-serializer",
+			UnionSerializer.class,
+			UnionSerializerSnapshot.class,
+			UnionSerializerMigrationTest::stringLongRowSupplier);
+
+		return testSpecifications.get();
+	}
+
+	private static TypeSerializer<TaggedUnion<String, Long>> stringLongRowSupplier() {
+		return new UnionSerializer<>(StringSerializer.INSTANCE, LongSerializer.INSTANCE);
+	}
+
+}
+
+
+
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data
new file mode 100644
index 0000000..cb29a99
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data
@@ -0,0 +1 @@
+53778725243338537787315927955377873178348653778731961974537787321387515377873232740853778732510096537787327007985377873288617553778733069270
\ No newline at end of file
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot
new file mode 100644
index 0000000..178d007
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data
new file mode 100644
index 0000000..d2f04fd
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data
@@ -0,0 +1 @@
+53711258937562537112614539985371126164162753711261837231537112620123325371126218543153711262413321537112625282785371126263382753711262735393
\ No newline at end of file
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot
new file mode 100644
index 0000000..747eb82
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot differ