You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/30 14:54:15 UTC
[flink] 10/18: [FLINK-11329] [DataStream] Migrate
StreamElementSerializer to use new compatibility API
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 17618f323a2fafcf66022e3a66e56a2d84ce5cb6
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Mon Jan 28 16:32:14 2019 +0100
[FLINK-11329] [DataStream] Migrate StreamElementSerializer to use new compatibility API
---
.../streamrecord/StreamElementSerializer.java | 93 ++++++++++++---------
.../StreamElementSerializerMigrationTest.java | 55 ++++++++++++
.../flink-1.6-stream-element-serializer-data | Bin 0 -> 158 bytes
.../flink-1.6-stream-element-serializer-snapshot | Bin 0 -> 931 bytes
.../flink-1.7-stream-element-serializer-data | Bin 0 -> 158 bytes
.../flink-1.7-stream-element-serializer-snapshot | Bin 0 -> 932 bytes
6 files changed, 109 insertions(+), 39 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index f84ff15a..e8af0f9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -19,15 +19,11 @@
package org.apache.flink.streaming.runtime.streamrecord;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -283,56 +279,75 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
// --------------------------------------------------------------------------------------------
@Override
- public StreamElementSerializerConfigSnapshot<T> snapshotConfiguration() {
- return new StreamElementSerializerConfigSnapshot<>(typeSerializer);
+ public StreamElementSerializerSnapshot<T> snapshotConfiguration() {
+ return new StreamElementSerializerSnapshot<>(this);
}
- @Override
- public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
- Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> previousTypeSerializerAndConfig;
+ /**
+ * Configuration snapshot specific to the {@link StreamElementSerializer}.
+ * @deprecated see {@link StreamElementSerializerSnapshot}.
+ */
+ @Deprecated
+ public static final class StreamElementSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<StreamElement> {
- // we are compatible for data written by ourselves or the legacy MultiplexingStreamRecordSerializer
- if (configSnapshot instanceof StreamElementSerializerConfigSnapshot) {
- previousTypeSerializerAndConfig =
- ((StreamElementSerializerConfigSnapshot<?>) configSnapshot).getSingleNestedSerializerAndConfig();
- } else {
- return CompatibilityResult.requiresMigration();
- }
+ private static final int VERSION = 1;
- CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
- previousTypeSerializerAndConfig.f0,
- UnloadableDummyTypeSerializer.class,
- previousTypeSerializerAndConfig.f1,
- typeSerializer);
+ /** This empty nullary constructor is required for deserializing the configuration. */
+ public StreamElementSerializerConfigSnapshot() {}
- if (!compatResult.isRequiresMigration()) {
- return CompatibilityResult.compatible();
- } else if (compatResult.getConvertDeserializer() != null) {
- return CompatibilityResult.requiresMigration(
- new StreamElementSerializer<>(
- new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
- } else {
- return CompatibilityResult.requiresMigration();
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public TypeSerializerSchemaCompatibility<StreamElement> resolveSchemaCompatibility(TypeSerializer<StreamElement> newSerializer) {
+ return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+ newSerializer,
+ new StreamElementSerializerSnapshot<>(),
+ getSingleNestedSerializerAndConfig().f1);
}
}
/**
* Configuration snapshot specific to the {@link StreamElementSerializer}.
*/
- public static final class StreamElementSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<StreamElement> {
+ public static final class StreamElementSerializerSnapshot<T>
+ extends CompositeTypeSerializerSnapshot<StreamElement, StreamElementSerializer<T>> {
- private static final int VERSION = 1;
+ private static final int VERSION = 2;
- /** This empty nullary constructor is required for deserializing the configuration. */
- public StreamElementSerializerConfigSnapshot() {}
+ @SuppressWarnings("WeakerAccess")
+ public StreamElementSerializerSnapshot() {
+ super(serializerClass());
+ }
- public StreamElementSerializerConfigSnapshot(TypeSerializer<T> typeSerializer) {
- super(typeSerializer);
+ StreamElementSerializerSnapshot(StreamElementSerializer<T> serializerInstance) {
+ super(serializerInstance);
}
@Override
- public int getVersion() {
+ protected int getCurrentOuterSnapshotVersion() {
return VERSION;
}
+
+ @Override
+ protected TypeSerializer<?>[] getNestedSerializers(StreamElementSerializer<T> outerSerializer) {
+ return new TypeSerializer[]{outerSerializer.getContainedTypeSerializer()};
+ }
+
+ @Override
+ protected StreamElementSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+ @SuppressWarnings("unchecked")
+ TypeSerializer<T> casted = (TypeSerializer<T>) nestedSerializers[0];
+
+ return new StreamElementSerializer<>(casted);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> Class<StreamElementSerializer<T>> serializerClass() {
+ Class<?> streamElementSerializerClass = StreamElementSerializer.class;
+ return (Class<StreamElementSerializer<T>>) streamElementSerializerClass;
+ }
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java
new file mode 100644
index 0000000..b6169d4
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.StreamElementSerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * Migration tests for {@link StreamElementSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class StreamElementSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<StreamElement> {
+
+ public StreamElementSerializerMigrationTest(TestSpecification<StreamElement> testSpecification) {
+ super(testSpecification);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Parameterized.Parameters(name = "Test Specification = {0}")
+ public static Collection<TestSpecification<?>> testSpecifications() {
+
+ final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+ testSpecifications.add(
+ "stream-element-serializer",
+ StreamElementSerializer.class,
+ StreamElementSerializerSnapshot.class,
+ () -> new StreamElementSerializer<>(StringSerializer.INSTANCE));
+
+ return testSpecifications.get();
+ }
+}
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data
new file mode 100644
index 0000000..81b80c3
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot
new file mode 100644
index 0000000..8ffdb43
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data
new file mode 100644
index 0000000..01f05e7
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot
new file mode 100644
index 0000000..dc7f76b
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot differ