You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/30 14:54:14 UTC
[flink] 09/18: [FLINK-11329] [DataStream] Migrate
BufferEntrySerializer to use new compatibility API
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a51adc3886d62109b1b7582733ab91e6e175a58c
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Sun Jan 27 20:54:23 2019 +0100
[FLINK-11329] [DataStream] Migrate BufferEntrySerializer to use new compatibility API
---
.../api/operators/co/IntervalJoinOperator.java | 103 +++++++++++++--------
.../co/BufferEntrySerializerMigrationTest.java | 57 ++++++++++++
.../flink-1.6-buffer-entry-serializer-data | Bin 0 -> 160 bytes
.../flink-1.6-buffer-entry-serializer-snapshot | Bin 0 -> 935 bytes
.../flink-1.7-buffer-entry-serializer-data | Bin 0 -> 160 bytes
.../flink-1.7-buffer-entry-serializer-snapshot | Bin 0 -> 936 bytes
6 files changed, 123 insertions(+), 37 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
index 43085cb..8f79c17 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -22,17 +22,15 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.StateInitializationContext;
@@ -361,7 +359,9 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
* This will contain the element itself along with a flag indicating
* if it has been joined or not.
*/
- private static class BufferEntry<T> {
+ @Internal
+ @VisibleForTesting
+ static class BufferEntry<T> {
private final T element;
private final boolean hasBeenJoined;
@@ -375,13 +375,15 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
/**
* A {@link TypeSerializer serializer} for the {@link BufferEntry}.
*/
- private static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
+ @Internal
+ @VisibleForTesting
+ static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
private static final long serialVersionUID = -20197698803836236L;
private final TypeSerializer<T> elementSerializer;
- private BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
+ BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
this.elementSerializer = Preconditions.checkNotNull(elementSerializer);
}
@@ -464,40 +466,19 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
}
@Override
- public TypeSerializerConfigSnapshot snapshotConfiguration() {
- return new BufferSerializerConfigSnapshot<>(elementSerializer);
- }
-
- @Override
- public CompatibilityResult<BufferEntry<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- if (configSnapshot instanceof BufferSerializerConfigSnapshot) {
- Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousSerializerAndConfig =
- ((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
-
- CompatibilityResult<T> compatResult =
- CompatibilityUtil.resolveCompatibilityResult(
- previousSerializerAndConfig.f0,
- UnloadableDummyTypeSerializer.class,
- previousSerializerAndConfig.f1,
- elementSerializer);
-
- if (!compatResult.isRequiresMigration()) {
- return CompatibilityResult.compatible();
- } else if (compatResult.getConvertDeserializer() != null) {
- return CompatibilityResult.requiresMigration(
- new BufferEntrySerializer<>(
- new TypeDeserializerAdapter<>(
- compatResult.getConvertDeserializer())));
- }
- }
- return CompatibilityResult.requiresMigration();
+ public TypeSerializerSnapshot<BufferEntry<T>> snapshotConfiguration() {
+ return new BufferEntrySerializerSnapshot<>(this);
}
}
/**
* The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer.
+ *
+ * @deprecated this snapshot class is no longer in use, and is maintained only for backwards compatibility.
+ * It is fully replaced by {@link BufferEntrySerializerSnapshot}.
*/
- public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+ @Deprecated
+ public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<BufferEntry<T>> {
private static final int VERSION = 1;
@@ -512,6 +493,54 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
public int getVersion() {
return VERSION;
}
+
+ @Override
+ public TypeSerializerSchemaCompatibility<BufferEntry<T>> resolveSchemaCompatibility(TypeSerializer<BufferEntry<T>> newSerializer) {
+
+ return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+ newSerializer,
+ new BufferEntrySerializerSnapshot<>(),
+ getSingleNestedSerializerAndConfig().f1);
+ }
+ }
+
+ /**
+ * A {@link TypeSerializerSnapshot} for {@link BufferEntrySerializer}.
+ */
+ public static final class BufferEntrySerializerSnapshot<T>
+ extends CompositeTypeSerializerSnapshot<BufferEntry<T>, BufferEntrySerializer<T>> {
+
+ private static final int VERSION = 2;
+
+ @SuppressWarnings({"unused", "WeakerAccess"})
+ public BufferEntrySerializerSnapshot() {
+ super(correspondingSerializerClass());
+ }
+
+ BufferEntrySerializerSnapshot(BufferEntrySerializer<T> serializerInstance) {
+ super(serializerInstance);
+ }
+
+ @Override
+ protected int getCurrentOuterSnapshotVersion() {
+ return VERSION;
+ }
+
+ @Override
+ protected TypeSerializer<?>[] getNestedSerializers(BufferEntrySerializer<T> outerSerializer) {
+ return new TypeSerializer[]{outerSerializer.elementSerializer};
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected BufferEntrySerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+ return new BufferEntrySerializer<>((TypeSerializer<T>) nestedSerializers[0]);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> Class<BufferEntrySerializer<T>> correspondingSerializerClass() {
+ return (Class<BufferEntrySerializer<T>>) (Class<?>) BufferEntrySerializer.class;
+ }
}
@VisibleForTesting
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java
new file mode 100644
index 0000000..d4d2673
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntry;
+import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntrySerializer;
+import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntrySerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * State migration tests for {@link BufferEntrySerializer}.
+ */
+@RunWith(Parameterized.class)
+public class BufferEntrySerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<BufferEntry<String>> {
+
+ public BufferEntrySerializerMigrationTest(TestSpecification<BufferEntry<String>> testSpecification) {
+ super(testSpecification);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Parameterized.Parameters(name = "Test Specification = {0}")
+ public static Collection<TestSpecification<?>> testSpecifications() {
+
+ final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+ testSpecifications.add(
+ "buffer-entry-serializer",
+ BufferEntrySerializer.class,
+ BufferEntrySerializerSnapshot.class,
+ () -> new BufferEntrySerializer<>(StringSerializer.INSTANCE));
+
+ return testSpecifications.get();
+ }
+}
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data
new file mode 100644
index 0000000..a4af1fc
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot
new file mode 100644
index 0000000..6141180
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data
new file mode 100644
index 0000000..36c9dc7
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot
new file mode 100644
index 0000000..af92e1b
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot differ