You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/30 14:54:13 UTC
[flink] 08/18: [FLINK-11329] [DataStream] Migrating the
UnionSerializer to use new compatibility API
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 392f2cd57380f9070414f4d6d120e4b051d4e061
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Sun Jan 27 16:40:40 2019 +0100
[FLINK-11329] [DataStream] Migrating the UnionSerializer to use new compatibility API
---
.../streaming/api/datastream/CoGroupedStreams.java | 110 +++++++++++++--------
.../datastream/UnionSerializerMigrationTest.java | 67 +++++++++++++
.../test/resources/flink-1.6-union-serializer-data | 1 +
.../resources/flink-1.6-union-serializer-snapshot | Bin 0 -> 1393 bytes
.../test/resources/flink-1.7-union-serializer-data | 1 +
.../resources/flink-1.7-union-serializer-snapshot | Bin 0 -> 1403 bytes
6 files changed, 138 insertions(+), 41 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 19d9783..3ef1861 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -25,14 +25,13 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -504,7 +503,9 @@ public class CoGroupedStreams<T1, T2> {
}
}
- private static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> {
+ @VisibleForTesting
+ @Internal
+ static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> {
private static final long serialVersionUID = 1L;
private final TypeSerializer<T1> oneSerializer;
@@ -618,63 +619,90 @@ public class CoGroupedStreams<T1, T2> {
}
@Override
- public TypeSerializerConfigSnapshot<TaggedUnion<T1, T2>> snapshotConfiguration() {
- return new UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer);
- }
-
- @Override
- public CompatibilityResult<TaggedUnion<T1, T2>> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
- if (configSnapshot instanceof UnionSerializerConfigSnapshot) {
- List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousSerializersAndConfigs =
- ((UnionSerializerConfigSnapshot<?, ?>) configSnapshot).getNestedSerializersAndConfigs();
-
- CompatibilityResult<T1> oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
- previousSerializersAndConfigs.get(0).f0,
- UnloadableDummyTypeSerializer.class,
- previousSerializersAndConfigs.get(0).f1,
- oneSerializer);
-
- CompatibilityResult<T2> twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
- previousSerializersAndConfigs.get(1).f0,
- UnloadableDummyTypeSerializer.class,
- previousSerializersAndConfigs.get(1).f1,
- twoSerializer);
-
- if (!oneSerializerCompatResult.isRequiresMigration() && !twoSerializerCompatResult.isRequiresMigration()) {
- return CompatibilityResult.compatible();
- } else if (oneSerializerCompatResult.getConvertDeserializer() != null && twoSerializerCompatResult.getConvertDeserializer() != null) {
- return CompatibilityResult.requiresMigration(
- new UnionSerializer<>(
- new TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()),
- new TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer())));
- }
- }
-
- return CompatibilityResult.requiresMigration();
+ public TypeSerializerSnapshot<TaggedUnion<T1, T2>> snapshotConfiguration() {
+ return new UnionSerializerSnapshot<>(this);
}
}
/**
* The {@link TypeSerializerConfigSnapshot} for the {@link UnionSerializer}.
+ *
+ * @deprecated this snapshot class is no longer in use, and is maintained only for backwards compatibility.
+ * It is fully replaced by {@link UnionSerializerSnapshot}.
*/
+ @Deprecated
public static class UnionSerializerConfigSnapshot<T1, T2>
- extends CompositeTypeSerializerConfigSnapshot<TaggedUnion<T1, T2>> {
+ extends CompositeTypeSerializerConfigSnapshot<TaggedUnion<T1, T2>> {
private static final int VERSION = 1;
- /** This empty nullary constructor is required for deserializing the configuration. */
- public UnionSerializerConfigSnapshot() {}
+ /**
+ * This empty nullary constructor is required for deserializing the configuration.
+ */
+ public UnionSerializerConfigSnapshot() {
+ }
public UnionSerializerConfigSnapshot(TypeSerializer<T1> oneSerializer, TypeSerializer<T2> twoSerializer) {
super(oneSerializer, twoSerializer);
}
@Override
+ public TypeSerializerSchemaCompatibility<TaggedUnion<T1, T2>> resolveSchemaCompatibility(TypeSerializer<TaggedUnion<T1, T2>> newSerializer) {
+ List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs();
+
+ return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+ newSerializer,
+ new UnionSerializerSnapshot<>(),
+ nestedSerializersAndConfigs.get(0).f1,
+ nestedSerializersAndConfigs.get(1).f1
+ );
+ }
+
+ @Override
public int getVersion() {
return VERSION;
}
}
+ /**
+ * The {@link TypeSerializerSnapshot} for the {@link UnionSerializer}.
+ */
+ public static class UnionSerializerSnapshot<T1, T2>
+ extends CompositeTypeSerializerSnapshot<TaggedUnion<T1, T2>, UnionSerializer<T1, T2>> {
+
+ private static final int VERSION = 2;
+
+ @SuppressWarnings("WeakerAccess")
+ public UnionSerializerSnapshot() {
+ super(correspondingSerializerClass());
+ }
+
+ UnionSerializerSnapshot(UnionSerializer<T1, T2> serializerInstance) {
+ super(serializerInstance);
+ }
+
+ @Override
+ protected int getCurrentOuterSnapshotVersion() {
+ return VERSION;
+ }
+
+ @Override
+ protected TypeSerializer<?>[] getNestedSerializers(UnionSerializer<T1, T2> outerSerializer) {
+ return new TypeSerializer[]{outerSerializer.oneSerializer, outerSerializer.twoSerializer};
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected UnionSerializer<T1, T2> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+ return new UnionSerializer<>((TypeSerializer<T1>) nestedSerializers[0], (TypeSerializer<T2>) nestedSerializers[1]);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T1, T2> Class<UnionSerializer<T1, T2>> correspondingSerializerClass() {
+ return (Class<UnionSerializer<T1, T2>>) (Class<?>) UnionSerializer.class;
+ }
+ }
+
// ------------------------------------------------------------------------
// Utility functions that implement the CoGroup logic based on the tagged
// union window reduce
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java
new file mode 100644
index 0000000..2b85d7e
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializer;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * State migration tests for {@link UnionSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class UnionSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<TaggedUnion<String, Long>> {
+
+ public UnionSerializerMigrationTest(TestSpecification<TaggedUnion<String, Long>> testSpecification) {
+ super(testSpecification);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Parameterized.Parameters(name = "Test Specification = {0}")
+ public static Collection<TestSpecification<?>> testSpecifications() {
+
+ final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+ testSpecifications.add(
+ "union-serializer",
+ UnionSerializer.class,
+ UnionSerializerSnapshot.class,
+ UnionSerializerMigrationTest::stringLongRowSupplier);
+
+ return testSpecifications.get();
+ }
+
+ private static TypeSerializer<TaggedUnion<String, Long>> stringLongRowSupplier() {
+ return new UnionSerializer<>(StringSerializer.INSTANCE, LongSerializer.INSTANCE);
+ }
+
+}
+
+
+
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data
new file mode 100644
index 0000000..cb29a99
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data
@@ -0,0 +1 @@
+53778725243338537787315927955377873178348653778731961974537787321387515377873232740853778732510096537787327007985377873288617553778733069270
\ No newline at end of file
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot
new file mode 100644
index 0000000..178d007
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data
new file mode 100644
index 0000000..d2f04fd
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data
@@ -0,0 +1 @@
+53711258937562537112614539985371126164162753711261837231537112620123325371126218543153711262413321537112625282785371126263382753711262735393
\ No newline at end of file
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot
new file mode 100644
index 0000000..747eb82
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot differ