You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/30 14:54:19 UTC

[flink] 14/18: [FLINK-11329] [DataStream] Migrate TimerSerializer to use new compatibility API

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7abaff3516b4afd34e15c6c238fd1ebaae253325
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jan 30 17:35:45 2019 +0800

    [FLINK-11329] [DataStream] Migrate TimerSerializer to use new compatibility API
---
 .../streaming/api/operators/TimerSerializer.java   |  67 ++++++++-------------
 .../api/operators/TimerSerializerSnapshot.java     |  66 ++++++++++++++++++++
 .../TimerSerializerSnapshotMigrationTest.java      |  61 +++++++++++++++++++
 .../test/resources/flink-1.6-timer-serializer-data | Bin 0 -> 240 bytes
 .../resources/flink-1.6-timer-serializer-snapshot  | Bin 0 -> 1406 bytes
 .../test/resources/flink-1.7-timer-serializer-data | Bin 0 -> 240 bytes
 .../resources/flink-1.7-timer-serializer-snapshot  | Bin 0 -> 1414 bytes
 7 files changed, 152 insertions(+), 42 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
index b641c09..fb8e0a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
@@ -18,14 +18,11 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.MathUtils;
@@ -33,7 +30,6 @@ import org.apache.flink.util.MathUtils;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Objects;
 
 /**
@@ -209,42 +205,8 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
 	}
 
 	@Override
-	public TypeSerializerConfigSnapshot<TimerHeapInternalTimer<K, N>> snapshotConfiguration() {
-		return new TimerSerializerConfigSnapshot<>(keySerializer, namespaceSerializer);
-	}
-
-	@Override
-	public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(
-		TypeSerializerConfigSnapshot configSnapshot) {
-
-		if (configSnapshot instanceof TimerSerializerConfigSnapshot) {
-			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousSerializersAndConfigs =
-				((TimerSerializerConfigSnapshot<?, ?>) configSnapshot).getNestedSerializersAndConfigs();
-
-			if (previousSerializersAndConfigs.size() == 2) {
-				Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> keySerializerAndSnapshot =
-					previousSerializersAndConfigs.get(KEY_SERIALIZER_SNAPSHOT_INDEX);
-				Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> namespaceSerializerAndSnapshot =
-					previousSerializersAndConfigs.get(NAMESPACE_SERIALIZER_SNAPSHOT_INDEX);
-				CompatibilityResult<K> keyCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
-					keySerializerAndSnapshot.f0,
-					UnloadableDummyTypeSerializer.class,
-					keySerializerAndSnapshot.f1,
-					keySerializer);
-
-				CompatibilityResult<N> namespaceCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
-					namespaceSerializerAndSnapshot.f0,
-					UnloadableDummyTypeSerializer.class,
-					namespaceSerializerAndSnapshot.f1,
-					namespaceSerializer);
-
-				if (!keyCompatibilityResult.isRequiresMigration()
-					&& !namespaceCompatibilityResult.isRequiresMigration()) {
-					return CompatibilityResult.compatible();
-				}
-			}
-		}
-		return CompatibilityResult.requiresMigration();
+	public TimerSerializerSnapshot<K, N> snapshotConfiguration() {
+		return new TimerSerializerSnapshot<>(this);
 	}
 
 	@Nonnull
@@ -262,7 +224,12 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
 	 *
 	 * @param <K> type of key.
 	 * @param <N> type of namespace.
+	 *
+	 * @deprecated this snapshot class is no longer in use, and is maintained only
+	 *             for backwards compatibility purposes. It is fully replaced by
+	 *             {@link TimerSerializerSnapshot}.
 	 */
+	@Deprecated
 	public static class TimerSerializerConfigSnapshot<K, N> extends CompositeTypeSerializerConfigSnapshot<TimerHeapInternalTimer<K, N>> {
 
 		private static final int VERSION = 1;
@@ -289,5 +256,21 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
 		public int getVersion() {
 			return VERSION;
 		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<TimerHeapInternalTimer<K, N>> resolveSchemaCompatibility(
+				TypeSerializer<TimerHeapInternalTimer<K, N>> newSerializer) {
+
+			final TypeSerializerSnapshot<?>[] nestedSnapshots = getNestedSerializersAndConfigs()
+				.stream()
+				.map(t -> t.f1)
+				.toArray(TypeSerializerSnapshot[]::new);
+
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new TimerSerializerSnapshot<>(),
+				nestedSnapshots
+			);
+		}
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java
new file mode 100644
index 0000000..c28b045
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Snapshot class for the {@link TimerSerializer}.
+ */
+@Internal
+public class TimerSerializerSnapshot<K, N> extends CompositeTypeSerializerSnapshot<TimerHeapInternalTimer<K, N>, TimerSerializer<K, N>> {
+
+	private static final int VERSION = 2;
+
+	public TimerSerializerSnapshot() {
+		super(correspondingSerializerClass());
+	}
+
+	public TimerSerializerSnapshot(TimerSerializer<K, N> timerSerializer) {
+		super(timerSerializer);
+	}
+
+	@Override
+	protected int getCurrentOuterSnapshotVersion() {
+		return VERSION;
+	}
+
+	@Override
+	protected TimerSerializer<K, N> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		final TypeSerializer<K> keySerializer = (TypeSerializer<K>) nestedSerializers[0];
+
+		@SuppressWarnings("unchecked")
+		final TypeSerializer<N> namespaceSerializer = (TypeSerializer<N>) nestedSerializers[1];
+
+		return new TimerSerializer<K, N>(keySerializer, namespaceSerializer);
+	}
+
+	@Override
+	protected TypeSerializer<?>[] getNestedSerializers(TimerSerializer<K, N> outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getKeySerializer(), outerSerializer.getNamespaceSerializer() };
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <K, N> Class<TimerSerializer<K, N>> correspondingSerializerClass() {
+		return (Class<TimerSerializer<K, N>>) (Class<?>) TimerSerializer.class;
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshotMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshotMigrationTest.java
new file mode 100644
index 0000000..6b07a09
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshotMigrationTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * Migration test for {@link TimerSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class TimerSerializerSnapshotMigrationTest
+	extends TypeSerializerSnapshotMigrationTestBase<TimerHeapInternalTimer<String, Integer>> {
+
+	public TimerSerializerSnapshotMigrationTest(
+		TestSpecification<TimerHeapInternalTimer<String, Integer>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"timer-serializer",
+			TimerSerializer.class,
+			TimerSerializerSnapshot.class,
+			TimerSerializerSnapshotMigrationTest::stringIntTimerSerializerSupplier);
+
+		return testSpecifications.get();
+	}
+
+	private static TypeSerializer<TimerHeapInternalTimer<String, Integer>> stringIntTimerSerializerSupplier() {
+		return new TimerSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE);
+	}
+}
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-timer-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-timer-serializer-data
new file mode 100644
index 0000000..02f84d9
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-timer-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-timer-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-timer-serializer-snapshot
new file mode 100644
index 0000000..fee77f2
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-timer-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-timer-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-timer-serializer-data
new file mode 100644
index 0000000..9069def
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-timer-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-timer-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-timer-serializer-snapshot
new file mode 100644
index 0000000..d5f3fda
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-timer-serializer-snapshot differ