You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/30 14:54:14 UTC

[flink] 09/18: [FLINK-11329] [DataStream] Migrate BufferEntrySerializer to use new compatibility API

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a51adc3886d62109b1b7582733ab91e6e175a58c
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Sun Jan 27 20:54:23 2019 +0100

    [FLINK-11329] [DataStream] Migrate BufferEntrySerializer to use new compatibility API
---
 .../api/operators/co/IntervalJoinOperator.java     | 103 +++++++++++++--------
 .../co/BufferEntrySerializerMigrationTest.java     |  57 ++++++++++++
 .../flink-1.6-buffer-entry-serializer-data         | Bin 0 -> 160 bytes
 .../flink-1.6-buffer-entry-serializer-snapshot     | Bin 0 -> 935 bytes
 .../flink-1.7-buffer-entry-serializer-data         | Bin 0 -> 160 bytes
 .../flink-1.7-buffer-entry-serializer-snapshot     | Bin 0 -> 936 bytes
 6 files changed, 123 insertions(+), 37 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
index 43085cb..8f79c17 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -22,17 +22,15 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -361,7 +359,9 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 	 * This will contain the element itself along with a flag indicating
 	 * if it has been joined or not.
 	 */
-	private static class BufferEntry<T> {
+	@Internal
+	@VisibleForTesting
+	static class BufferEntry<T> {
 
 		private final T element;
 		private final boolean hasBeenJoined;
@@ -375,13 +375,15 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 	/**
 	 * A {@link TypeSerializer serializer} for the {@link BufferEntry}.
 	 */
-	private static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
+	@Internal
+	@VisibleForTesting
+	static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
 
 		private static final long serialVersionUID = -20197698803836236L;
 
 		private final TypeSerializer<T> elementSerializer;
 
-		private BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
+		BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
 			this.elementSerializer = Preconditions.checkNotNull(elementSerializer);
 		}
 
@@ -464,40 +466,19 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
-			return new BufferSerializerConfigSnapshot<>(elementSerializer);
-		}
-
-		@Override
-		public CompatibilityResult<BufferEntry<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-			if (configSnapshot instanceof BufferSerializerConfigSnapshot) {
-				Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousSerializerAndConfig =
-						((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
-
-				CompatibilityResult<T> compatResult =
-						CompatibilityUtil.resolveCompatibilityResult(
-								previousSerializerAndConfig.f0,
-								UnloadableDummyTypeSerializer.class,
-								previousSerializerAndConfig.f1,
-								elementSerializer);
-
-				if (!compatResult.isRequiresMigration()) {
-					return CompatibilityResult.compatible();
-				} else if (compatResult.getConvertDeserializer() != null) {
-					return CompatibilityResult.requiresMigration(
-							new BufferEntrySerializer<>(
-									new TypeDeserializerAdapter<>(
-											compatResult.getConvertDeserializer())));
-				}
-			}
-			return CompatibilityResult.requiresMigration();
+		public TypeSerializerSnapshot<BufferEntry<T>> snapshotConfiguration() {
+			return new BufferEntrySerializerSnapshot<>(this);
 		}
 	}
 
 	/**
 	 * The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer.
+	 *
+	 * @deprecated this snapshot class is no longer in use, and is maintained only for backwards compatibility.
+	 *             It is fully replaced by {@link BufferEntrySerializerSnapshot}.
 	 */
-	public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+	@Deprecated
+	public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<BufferEntry<T>> {
 
 		private static final int VERSION = 1;
 
@@ -512,6 +493,54 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 		public int getVersion() {
 			return VERSION;
 		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<BufferEntry<T>> resolveSchemaCompatibility(TypeSerializer<BufferEntry<T>> newSerializer) {
+
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new BufferEntrySerializerSnapshot<>(),
+				getSingleNestedSerializerAndConfig().f1);
+		}
+	}
+
+	/**
+	 * A {@link TypeSerializerSnapshot} for {@link BufferEntrySerializer}.
+	 */
+	public static final class BufferEntrySerializerSnapshot<T>
+		extends CompositeTypeSerializerSnapshot<BufferEntry<T>, BufferEntrySerializer<T>> {
+
+		private static final int VERSION = 2;
+
+		@SuppressWarnings({"unused", "WeakerAccess"})
+		public BufferEntrySerializerSnapshot() {
+			super(correspondingSerializerClass());
+		}
+
+		BufferEntrySerializerSnapshot(BufferEntrySerializer<T> serializerInstance) {
+			super(serializerInstance);
+		}
+
+		@Override
+		protected int getCurrentOuterSnapshotVersion() {
+			return VERSION;
+		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(BufferEntrySerializer<T> outerSerializer) {
+			return new TypeSerializer[]{outerSerializer.elementSerializer};
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		protected BufferEntrySerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			return new BufferEntrySerializer<>((TypeSerializer<T>) nestedSerializers[0]);
+		}
+
+		@SuppressWarnings("unchecked")
+		private static <T> Class<BufferEntrySerializer<T>> correspondingSerializerClass() {
+			return (Class<BufferEntrySerializer<T>>) (Class<?>) BufferEntrySerializer.class;
+		}
 	}
 
 	@VisibleForTesting
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java
new file mode 100644
index 0000000..d4d2673
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntry;
+import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntrySerializer;
+import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntrySerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * State migration tests for {@link BufferEntrySerializer}.
+ */
+@RunWith(Parameterized.class)
+public class BufferEntrySerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<BufferEntry<String>> {
+
+	public BufferEntrySerializerMigrationTest(TestSpecification<BufferEntry<String>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"buffer-entry-serializer",
+			BufferEntrySerializer.class,
+			BufferEntrySerializerSnapshot.class,
+			() -> new BufferEntrySerializer<>(StringSerializer.INSTANCE));
+
+		return testSpecifications.get();
+	}
+}
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data
new file mode 100644
index 0000000..a4af1fc
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot
new file mode 100644
index 0000000..6141180
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data
new file mode 100644
index 0000000..36c9dc7
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot
new file mode 100644
index 0000000..af92e1b
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot differ