You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/30 14:54:15 UTC

[flink] 10/18: [FLINK-11329] [DataStream] Migrate StreamElementSerializer to use new compatibility API

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 17618f323a2fafcf66022e3a66e56a2d84ce5cb6
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Mon Jan 28 16:32:14 2019 +0100

    [FLINK-11329] [DataStream] Migrate StreamElementSerializer to use new compatibility API
---
 .../streamrecord/StreamElementSerializer.java      |  93 ++++++++++++---------
 .../StreamElementSerializerMigrationTest.java      |  55 ++++++++++++
 .../flink-1.6-stream-element-serializer-data       | Bin 0 -> 158 bytes
 .../flink-1.6-stream-element-serializer-snapshot   | Bin 0 -> 931 bytes
 .../flink-1.7-stream-element-serializer-data       | Bin 0 -> 158 bytes
 .../flink-1.7-stream-element-serializer-snapshot   | Bin 0 -> 932 bytes
 6 files changed, 109 insertions(+), 39 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index f84ff15a..e8af0f9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -19,15 +19,11 @@
 package org.apache.flink.streaming.runtime.streamrecord;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -283,56 +279,75 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public StreamElementSerializerConfigSnapshot<T> snapshotConfiguration() {
-		return new StreamElementSerializerConfigSnapshot<>(typeSerializer);
+	public StreamElementSerializerSnapshot<T> snapshotConfiguration() {
+		return new StreamElementSerializerSnapshot<>(this);
 	}
 
-	@Override
-	public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-		Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> previousTypeSerializerAndConfig;
+	/**
+	 * Configuration snapshot specific to the {@link StreamElementSerializer}.
+	 * @deprecated see {@link StreamElementSerializerSnapshot}.
+	 */
+	@Deprecated
+	public static final class StreamElementSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<StreamElement> {
 
-		// we are compatible for data written by ourselves or the legacy MultiplexingStreamRecordSerializer
-		if (configSnapshot instanceof StreamElementSerializerConfigSnapshot) {
-			previousTypeSerializerAndConfig =
-				((StreamElementSerializerConfigSnapshot<?>) configSnapshot).getSingleNestedSerializerAndConfig();
-		} else {
-			return CompatibilityResult.requiresMigration();
-		}
+		private static final int VERSION = 1;
 
-		CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
-				previousTypeSerializerAndConfig.f0,
-				UnloadableDummyTypeSerializer.class,
-				previousTypeSerializerAndConfig.f1,
-				typeSerializer);
+		/** This empty nullary constructor is required for deserializing the configuration. */
+		public StreamElementSerializerConfigSnapshot() {}
 
-		if (!compatResult.isRequiresMigration()) {
-			return CompatibilityResult.compatible();
-		} else if (compatResult.getConvertDeserializer() != null) {
-			return CompatibilityResult.requiresMigration(
-				new StreamElementSerializer<>(
-					new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
-		} else {
-			return CompatibilityResult.requiresMigration();
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<StreamElement> resolveSchemaCompatibility(TypeSerializer<StreamElement> newSerializer) {
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new StreamElementSerializerSnapshot<>(),
+				getSingleNestedSerializerAndConfig().f1);
 		}
 	}
 
 	/**
 	 * Configuration snapshot specific to the {@link StreamElementSerializer}.
 	 */
-	public static final class StreamElementSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<StreamElement> {
+	public static final class StreamElementSerializerSnapshot<T>
+		extends CompositeTypeSerializerSnapshot<StreamElement, StreamElementSerializer<T>> {
 
-		private static final int VERSION = 1;
+		private static final int VERSION = 2;
 
-		/** This empty nullary constructor is required for deserializing the configuration. */
-		public StreamElementSerializerConfigSnapshot() {}
+		@SuppressWarnings("WeakerAccess")
+		public StreamElementSerializerSnapshot() {
+			super(serializerClass());
+		}
 
-		public StreamElementSerializerConfigSnapshot(TypeSerializer<T> typeSerializer) {
-			super(typeSerializer);
+		StreamElementSerializerSnapshot(StreamElementSerializer<T> serializerInstance) {
+			super(serializerInstance);
 		}
 
 		@Override
-		public int getVersion() {
+		protected int getCurrentOuterSnapshotVersion() {
 			return VERSION;
 		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(StreamElementSerializer<T> outerSerializer) {
+			return new TypeSerializer[]{outerSerializer.getContainedTypeSerializer()};
+		}
+
+		@Override
+		protected StreamElementSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			@SuppressWarnings("unchecked")
+			TypeSerializer<T> casted = (TypeSerializer<T>) nestedSerializers[0];
+
+			return new StreamElementSerializer<>(casted);
+		}
+
+		@SuppressWarnings("unchecked")
+		private static <T> Class<StreamElementSerializer<T>> serializerClass() {
+			Class<?> streamElementSerializerClass = StreamElementSerializer.class;
+			return (Class<StreamElementSerializer<T>>) streamElementSerializerClass;
+		}
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java
new file mode 100644
index 0000000..b6169d4
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.StreamElementSerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * Migration tests for {@link StreamElementSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class StreamElementSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<StreamElement> {
+
+	public StreamElementSerializerMigrationTest(TestSpecification<StreamElement> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"stream-element-serializer",
+			StreamElementSerializer.class,
+			StreamElementSerializerSnapshot.class,
+			() -> new StreamElementSerializer<>(StringSerializer.INSTANCE));
+
+		return testSpecifications.get();
+	}
+}
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data
new file mode 100644
index 0000000..81b80c3
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot
new file mode 100644
index 0000000..8ffdb43
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data
new file mode 100644
index 0000000..01f05e7
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot
new file mode 100644
index 0000000..dc7f76b
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot differ