You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/30 14:54:05 UTC

[flink] branch master updated (3abb3de -> 8b72b1f)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3abb3de  [FLINK-11469][docs] Update documentation for `Tuning Checkpoints and Large State`
     new 3bfffbf  [hotfix][cep] Close nfa in CepOperator only if not null
     new 8a8b8d8  [hotfix][cep] Split NFAStateSerializer methods into smaller ones.
     new 4e607e9  [FLINK-11328][core] Migrate NFAStateSerializer to new TypeSerializer interface
     new e49c3e0  [FLINK-11329] [DataStream] Migrating CompositeSerializer to use new compatibility API
     new d575951  [hotfix] [e2e] Add missing serialVersionUIDs to state TTL e2e test classes
     new b22e913  [FLINK-11329] [core] Migrating the NullableSerializer to use new compatibility API
     new b434b32  [FLINK-11329] [table] Migrating the RowSerializer to use new compatibility API
     new 392f2cd  [FLINK-11329] [DataStream] Migrating the UnionSerializer to use new compatibility API
     new a51adc3  [FLINK-11329] [DataStream] Migrate BufferEntrySerializer to use new compatibility API
     new 17618f3  [FLINK-11329] [DataStream] Migrate StreamElementSerializer to use new compatibility API
     new a34d463  [FLINK-11329] [scala] Migrating ScalaOptionSerializer to use new compatibility API
     new 198ceb2  [FLINK-11329] [scala] Delegate compatibility check on old OptionSerializerConfigSnapshot to ScalaOptionSerializerSnapshot
     new 5192101  [FLINK-11329] [DataStream] Migrate TwoPhaseCommitSinkFunction.StateSerializer to use new compatibility API
     new 7abaff3  [FLINK-11329] [DataStream] Migrate TimerSerializer to use new compatibility API
     new 64cbf2a  [FLINK-11322] [kafka] Use try-with-resource for short-living connections in Kafka connectors
     new 8f0cfdf  [hotfix] [docs] Fix typo in best_practices
     new e0aa151  [hotfix] [javadocs] Fix typo in RpcService
     new 8b72b1f  [hotfix] [rocksdb] Fix typo in RocksDB state migration exception message

The 18 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/best_practices.md                         |   2 +-
 .../connectors/kafka/FlinkKafkaConsumer010.java    |  16 +-
 .../streaming/connectors/kafka/Kafka08ITCase.java  |  29 +--
 .../connectors/kafka/FlinkKafkaConsumer.java       |  17 +-
 .../api/common/typeutils/CompositeSerializer.java  |  16 +-
 .../java/typeutils/runtime/NullableSerializer.java | 179 ++++++++++++-----
 .../api/java/typeutils/runtime/RowSerializer.java  | 138 +++++++------
 .../common/typeutils/CompositeSerializerTest.java  |   5 +
 .../TypeSerializerSnapshotMigrationTestBase.java   |  63 +++++-
 .../runtime/NullableSerializerMigrationTest.java   |  83 ++++++++
 .../runtime/RowSerializerMigrationTest.java        |  64 ++++++
 .../flink-1.6-nullable-not-padded-serializer-data  | Bin 0 -> 58 bytes
 ...ink-1.6-nullable-not-padded-serializer-snapshot | Bin 0 -> 944 bytes
 .../flink-1.6-nullable-padded-serializer-data      | Bin 0 -> 90 bytes
 .../flink-1.6-nullable-padded-serializer-snapshot  | Bin 0 -> 952 bytes
 .../test/resources/flink-1.6-row-serializer-data   | Bin 0 -> 240 bytes
 .../resources/flink-1.6-row-serializer-snapshot    | Bin 0 -> 1444 bytes
 .../flink-1.7-nullable-not-padded-serializer-data  | Bin 0 -> 58 bytes
 ...ink-1.7-nullable-not-padded-serializer-snapshot | Bin 0 -> 941 bytes
 .../flink-1.7-nullable-padded-serializer-data      | Bin 0 -> 90 bytes
 .../flink-1.7-nullable-padded-serializer-snapshot  | Bin 0 -> 949 bytes
 .../test/resources/flink-1.7-row-serializer-data   | Bin 0 -> 240 bytes
 .../resources/flink-1.7-row-serializer-snapshot    | Bin 0 -> 1454 bytes
 .../streaming/tests/TtlVerifyUpdateFunction.java   |   6 +-
 .../flink/streaming/tests/verify/ValueWithTs.java  |  65 +++++-
 .../apache/flink/cep/nfa/NFAStateSerializer.java   | 222 +++++++++++++--------
 .../flink/cep/nfa/NFAStateSerializerSnapshot.java  |  75 +++++++
 .../org/apache/flink/cep/operator/CepOperator.java |  10 +-
 .../cep/NFASerializerSnapshotsMigrationTest.java   |   8 +-
 .../java/org/apache/flink/cep/nfa/NFATest.java     |   2 +-
 .../resources/flink-1.6-nfa-state-serializer-data  | Bin 0 -> 1460 bytes
 .../flink-1.6-nfa-state-serializer-snapshot        | Bin 0 -> 362 bytes
 .../resources/flink-1.7-nfa-state-serializer-data  | Bin 0 -> 1460 bytes
 .../flink-1.7-nfa-state-serializer-snapshot        | Bin 0 -> 350 bytes
 .../org/apache/flink/runtime/rpc/RpcService.java   |   2 +-
 .../flink/runtime/state/ttl/TtlStateFactory.java   |  74 ++++++-
 .../state/ttl/TtlSerializerStateMigrationTest.java |  60 ++++++
 .../test/resources/flink-1.6-ttl-serializer-data   | Bin 0 -> 230 bytes
 .../resources/flink-1.6-ttl-serializer-snapshot    | Bin 0 -> 1753 bytes
 .../test/resources/flink-1.7-ttl-serializer-data   | Bin 0 -> 230 bytes
 .../resources/flink-1.7-ttl-serializer-snapshot    | Bin 0 -> 1763 bytes
 .../ScalaOptionSerializerConfigSnapshot.java       |  14 ++
 .../typeutils/ScalaOptionSerializerSnapshot.java   |  63 ++++++
 .../api/scala/typeutils/OptionSerializer.scala     |  51 ++---
 ...ScalaOptionSerializerSnapshotMigrationTest.java |  58 ++++++
 .../flink-1.6-scala-option-serializer-data         | Bin 0 -> 31 bytes
 .../flink-1.6-scala-option-serializer-snapshot     | Bin 0 -> 876 bytes
 .../flink-1.7-scala-option-serializer-data         | Bin 0 -> 31 bytes
 .../flink-1.7-scala-option-serializer-snapshot     | Bin 0 -> 877 bytes
 .../streaming/state/AbstractRocksDBState.java      |   2 +-
 .../streaming/api/datastream/CoGroupedStreams.java | 110 ++++++----
 .../functions/sink/TwoPhaseCommitSinkFunction.java | 112 +++++++----
 .../streaming/api/operators/TimerSerializer.java   |  67 +++----
 .../api/operators/TimerSerializerSnapshot.java     |  66 ++++++
 .../api/operators/co/IntervalJoinOperator.java     | 103 ++++++----
 .../streamrecord/StreamElementSerializer.java      |  93 +++++----
 .../datastream/UnionSerializerMigrationTest.java   |  67 +++++++
 ...haseCommitSinkStateSerializerMigrationTest.java |  61 ++++++
 .../TimerSerializerSnapshotMigrationTest.java      |  61 ++++++
 .../co/BufferEntrySerializerMigrationTest.java     |  57 ++++++
 .../StreamElementSerializerMigrationTest.java      |  55 +++++
 .../flink-1.6-buffer-entry-serializer-data         | Bin 0 -> 160 bytes
 .../flink-1.6-buffer-entry-serializer-snapshot     | Bin 0 -> 935 bytes
 .../flink-1.6-stream-element-serializer-data       | Bin 0 -> 158 bytes
 .../flink-1.6-stream-element-serializer-snapshot   | Bin 0 -> 931 bytes
 .../test/resources/flink-1.6-timer-serializer-data | Bin 0 -> 240 bytes
 .../resources/flink-1.6-timer-serializer-snapshot  | Bin 0 -> 1406 bytes
 ...1.6-two-phase-commit-sink-state-serializer-data | Bin 0 -> 530 bytes
 ...two-phase-commit-sink-state-serializer-snapshot | Bin 0 -> 1430 bytes
 .../test/resources/flink-1.6-union-serializer-data |   1 +
 .../resources/flink-1.6-union-serializer-snapshot  | Bin 0 -> 1393 bytes
 .../flink-1.7-buffer-entry-serializer-data         | Bin 0 -> 160 bytes
 .../flink-1.7-buffer-entry-serializer-snapshot     | Bin 0 -> 936 bytes
 .../flink-1.7-stream-element-serializer-data       | Bin 0 -> 158 bytes
 .../flink-1.7-stream-element-serializer-snapshot   | Bin 0 -> 932 bytes
 .../test/resources/flink-1.7-timer-serializer-data | Bin 0 -> 240 bytes
 .../resources/flink-1.7-timer-serializer-snapshot  | Bin 0 -> 1414 bytes
 ...1.7-two-phase-commit-sink-state-serializer-data | Bin 0 -> 530 bytes
 ...two-phase-commit-sink-state-serializer-snapshot | Bin 0 -> 1438 bytes
 .../test/resources/flink-1.7-union-serializer-data |   1 +
 .../resources/flink-1.7-union-serializer-snapshot  | Bin 0 -> 1403 bytes
 81 files changed, 1682 insertions(+), 496 deletions(-)
 create mode 100644 flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java
 create mode 100644 flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java
 create mode 100644 flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.6-row-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.6-row-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot
 create mode 100644 flink-core/src/test/resources/flink-1.7-row-serializer-data
 create mode 100644 flink-core/src/test/resources/flink-1.7-row-serializer-snapshot
 create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.6-nfa-state-serializer-data
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.6-nfa-state-serializer-snapshot
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.7-nfa-state-serializer-data
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.7-nfa-state-serializer-snapshot
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlSerializerStateMigrationTest.java
 create mode 100644 flink-runtime/src/test/resources/flink-1.6-ttl-serializer-data
 create mode 100644 flink-runtime/src/test/resources/flink-1.6-ttl-serializer-snapshot
 create mode 100644 flink-runtime/src/test/resources/flink-1.7-ttl-serializer-data
 create mode 100644 flink-runtime/src/test/resources/flink-1.7-ttl-serializer-snapshot
 create mode 100644 flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
 create mode 100644 flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java
 create mode 100644 flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data
 create mode 100644 flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot
 create mode 100644 flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data
 create mode 100644 flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot
 create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkStateSerializerMigrationTest.java
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshotMigrationTest.java
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-timer-serializer-data
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-timer-serializer-snapshot
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-two-phase-commit-sink-state-serializer-data
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-two-phase-commit-sink-state-serializer-snapshot
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-timer-serializer-data
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-timer-serializer-snapshot
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-two-phase-commit-sink-state-serializer-data
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-two-phase-commit-sink-state-serializer-snapshot
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot


[flink] 07/18: [FLINK-11329] [table] Migrating the RowSerializer to use new compatibility API

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b434b32c0875d31acf30ab156d94d11bd1379975
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Sun Jan 27 15:41:49 2019 +0100

    [FLINK-11329] [table] Migrating the RowSerializer to use new compatibility API
---
 .../api/java/typeutils/runtime/RowSerializer.java  | 138 +++++++++++----------
 .../runtime/RowSerializerMigrationTest.java        |  64 ++++++++++
 .../test/resources/flink-1.6-row-serializer-data   | Bin 0 -> 240 bytes
 .../resources/flink-1.6-row-serializer-snapshot    | Bin 0 -> 1444 bytes
 .../test/resources/flink-1.7-row-serializer-data   | Bin 0 -> 240 bytes
 .../resources/flink-1.7-row-serializer-snapshot    | Bin 0 -> 1454 bytes
 6 files changed, 139 insertions(+), 63 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index e29f681..df33402 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -15,18 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Row;
@@ -34,7 +32,6 @@ import org.apache.flink.types.Row;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.Arrays;
-import java.util.List;
 
 import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
 import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
@@ -95,7 +92,8 @@ public final class RowSerializer extends TypeSerializer<Row> {
 			if (fromField != null) {
 				Object copy = fieldSerializers[i].copy(fromField);
 				result.setField(i, copy);
-			} else {
+			}
+			else {
 				result.setField(i, null);
 			}
 		}
@@ -123,11 +121,13 @@ public final class RowSerializer extends TypeSerializer<Row> {
 				if (reuseField != null) {
 					Object copy = fieldSerializers[i].copy(fromField, reuseField);
 					reuse.setField(i, copy);
-				} else {
+				}
+				else {
 					Object copy = fieldSerializers[i].copy(fromField);
 					reuse.setField(i, copy);
 				}
-			} else {
+			}
+			else {
 				reuse.setField(i, null);
 			}
 		}
@@ -163,7 +163,6 @@ public final class RowSerializer extends TypeSerializer<Row> {
 		}
 	}
 
-
 	@Override
 	public Row deserialize(DataInputView source) throws IOException {
 		int len = fieldSerializers.length;
@@ -176,7 +175,8 @@ public final class RowSerializer extends TypeSerializer<Row> {
 		for (int i = 0; i < len; i++) {
 			if (nullMask[i]) {
 				result.setField(i, null);
-			} else {
+			}
+			else {
 				result.setField(i, fieldSerializers[i].deserialize(source));
 			}
 		}
@@ -198,11 +198,13 @@ public final class RowSerializer extends TypeSerializer<Row> {
 		for (int i = 0; i < len; i++) {
 			if (nullMask[i]) {
 				reuse.setField(i, null);
-			} else {
+			}
+			else {
 				Object reuseField = reuse.getField(i);
 				if (reuseField != null) {
 					reuse.setField(i, fieldSerializers[i].deserialize(reuseField, source));
-				} else {
+				}
+				else {
 					reuse.setField(i, fieldSerializers[i].deserialize(source));
 				}
 			}
@@ -260,73 +262,83 @@ public final class RowSerializer extends TypeSerializer<Row> {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	// Serializer configuration snapshotting & compatibility
+	// Serializer configuration snapshoting & compatibility
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public RowSerializerConfigSnapshot snapshotConfiguration() {
-		return new RowSerializerConfigSnapshot(fieldSerializers);
+	public TypeSerializerSnapshot<Row> snapshotConfiguration() {
+		return new RowSerializerSnapshot(this);
 	}
 
-	@Override
-	public CompatibilityResult<Row> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-		if (configSnapshot instanceof RowSerializerConfigSnapshot) {
-			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousFieldSerializersAndConfigs =
-				((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
-
-			if (previousFieldSerializersAndConfigs.size() == fieldSerializers.length) {
-				boolean requireMigration = false;
-				TypeSerializer<?>[] convertDeserializers = new TypeSerializer<?>[fieldSerializers.length];
-
-				CompatibilityResult<?> compatResult;
-				int i = 0;
-				for (Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> f : previousFieldSerializersAndConfigs) {
-					compatResult = CompatibilityUtil.resolveCompatibilityResult(
-							f.f0,
-							UnloadableDummyTypeSerializer.class,
-							f.f1,
-							fieldSerializers[i]);
-
-					if (compatResult.isRequiresMigration()) {
-						requireMigration = true;
-
-						if (compatResult.getConvertDeserializer() == null) {
-							// one of the field serializers cannot provide a fallback deserializer
-							return CompatibilityResult.requiresMigration();
-						} else {
-							convertDeserializers[i] =
-								new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
-						}
-					}
+	/**
+	 * A snapshot for {@link RowSerializer}.
+	 *
+	 * @deprecated this snapshot class is no longer in use, and is maintained only for backwards compatibility.
+	 *             It is fully replaced by {@link RowSerializerSnapshot}.
+	 */
+	@Deprecated
+	public static final class RowSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot<Row> {
 
-					i++;
-				}
+		private static final int VERSION = 1;
 
-				if (requireMigration) {
-					return CompatibilityResult.requiresMigration(new RowSerializer(convertDeserializers));
-				} else {
-					return CompatibilityResult.compatible();
-				}
-			}
+		/**
+		 * This empty nullary constructor is required for deserializing the configuration.
+		 */
+		public RowSerializerConfigSnapshot() {
+		}
+
+		public RowSerializerConfigSnapshot(TypeSerializer[] fieldSerializers) {
+			super(fieldSerializers);
 		}
 
-		return CompatibilityResult.requiresMigration();
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<Row> resolveSchemaCompatibility(TypeSerializer<Row> newSerializer) {
+			TypeSerializerSnapshot<?>[] nestedSnapshots = getNestedSerializersAndConfigs()
+				.stream()
+				.map(t -> t.f1)
+				.toArray(TypeSerializerSnapshot[]::new);
+
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new RowSerializerSnapshot(),
+				nestedSnapshots);
+		}
 	}
 
-	public static final class RowSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot<Row> {
+	/**
+	 * A {@link TypeSerializerSnapshot} for RowSerializer.
+	 */
+	public static final class RowSerializerSnapshot extends CompositeTypeSerializerSnapshot<Row, RowSerializer> {
 
-		private static final int VERSION = 1;
+		private static final int VERSION = 2;
 
-		/** This empty nullary constructor is required for deserializing the configuration. */
-		public RowSerializerConfigSnapshot() {}
+		@SuppressWarnings("WeakerAccess")
+		public RowSerializerSnapshot() {
+			super(RowSerializer.class);
+		}
 
-		public RowSerializerConfigSnapshot(TypeSerializer[] fieldSerializers) {
-			super(fieldSerializers);
+		RowSerializerSnapshot(RowSerializer serializerInstance) {
+			super(serializerInstance);
 		}
 
 		@Override
-		public int getVersion() {
+		protected int getCurrentOuterSnapshotVersion() {
 			return VERSION;
 		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(RowSerializer outerSerializer) {
+			return outerSerializer.fieldSerializers;
+		}
+
+		@Override
+		protected RowSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			return new RowSerializer(nestedSerializers);
+		}
 	}
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java
new file mode 100644
index 0000000..7a7888a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer.RowSerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
+import org.apache.flink.types.Row;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * State migration test for {@link RowSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class RowSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Row> {
+
+	public RowSerializerMigrationTest(TestSpecification<Row> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"row-serializer",
+			RowSerializer.class,
+			RowSerializerSnapshot.class,
+			RowSerializerMigrationTest::stringLongRowSupplier);
+
+		return testSpecifications.get();
+	}
+
+	private static TypeSerializer<Row> stringLongRowSupplier() {
+		RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
+		return rowTypeInfo.createSerializer(new ExecutionConfig());
+	}
+}
diff --git a/flink-core/src/test/resources/flink-1.6-row-serializer-data b/flink-core/src/test/resources/flink-1.6-row-serializer-data
new file mode 100644
index 0000000..2cd0bd3
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-row-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.6-row-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-row-serializer-snapshot
new file mode 100644
index 0000000..9200e23
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-row-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-row-serializer-data b/flink-core/src/test/resources/flink-1.7-row-serializer-data
new file mode 100644
index 0000000..41aa078
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-row-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-row-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-row-serializer-snapshot
new file mode 100644
index 0000000..f632dc4
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-row-serializer-snapshot differ


[flink] 09/18: [FLINK-11329] [DataStream] Migrate BufferEntrySerializer to use new compatibility API

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a51adc3886d62109b1b7582733ab91e6e175a58c
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Sun Jan 27 20:54:23 2019 +0100

    [FLINK-11329] [DataStream] Migrate BufferEntrySerializer to use new compatibility API
---
 .../api/operators/co/IntervalJoinOperator.java     | 103 +++++++++++++--------
 .../co/BufferEntrySerializerMigrationTest.java     |  57 ++++++++++++
 .../flink-1.6-buffer-entry-serializer-data         | Bin 0 -> 160 bytes
 .../flink-1.6-buffer-entry-serializer-snapshot     | Bin 0 -> 935 bytes
 .../flink-1.7-buffer-entry-serializer-data         | Bin 0 -> 160 bytes
 .../flink-1.7-buffer-entry-serializer-snapshot     | Bin 0 -> 936 bytes
 6 files changed, 123 insertions(+), 37 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
index 43085cb..8f79c17 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java
@@ -22,17 +22,15 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -361,7 +359,9 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 	 * This will contain the element itself along with a flag indicating
 	 * if it has been joined or not.
 	 */
-	private static class BufferEntry<T> {
+	@Internal
+	@VisibleForTesting
+	static class BufferEntry<T> {
 
 		private final T element;
 		private final boolean hasBeenJoined;
@@ -375,13 +375,15 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 	/**
 	 * A {@link TypeSerializer serializer} for the {@link BufferEntry}.
 	 */
-	private static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
+	@Internal
+	@VisibleForTesting
+	static class BufferEntrySerializer<T> extends TypeSerializer<BufferEntry<T>> {
 
 		private static final long serialVersionUID = -20197698803836236L;
 
 		private final TypeSerializer<T> elementSerializer;
 
-		private BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
+		BufferEntrySerializer(TypeSerializer<T> elementSerializer) {
 			this.elementSerializer = Preconditions.checkNotNull(elementSerializer);
 		}
 
@@ -464,40 +466,19 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot snapshotConfiguration() {
-			return new BufferSerializerConfigSnapshot<>(elementSerializer);
-		}
-
-		@Override
-		public CompatibilityResult<BufferEntry<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-			if (configSnapshot instanceof BufferSerializerConfigSnapshot) {
-				Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousSerializerAndConfig =
-						((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
-
-				CompatibilityResult<T> compatResult =
-						CompatibilityUtil.resolveCompatibilityResult(
-								previousSerializerAndConfig.f0,
-								UnloadableDummyTypeSerializer.class,
-								previousSerializerAndConfig.f1,
-								elementSerializer);
-
-				if (!compatResult.isRequiresMigration()) {
-					return CompatibilityResult.compatible();
-				} else if (compatResult.getConvertDeserializer() != null) {
-					return CompatibilityResult.requiresMigration(
-							new BufferEntrySerializer<>(
-									new TypeDeserializerAdapter<>(
-											compatResult.getConvertDeserializer())));
-				}
-			}
-			return CompatibilityResult.requiresMigration();
+		public TypeSerializerSnapshot<BufferEntry<T>> snapshotConfiguration() {
+			return new BufferEntrySerializerSnapshot<>(this);
 		}
 	}
 
 	/**
 	 * The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer.
+	 *
+	 * @deprecated this snapshot class is no longer in use, and is maintained only for backwards compatibility.
+	 *             It is fully replaced by {@link BufferEntrySerializerSnapshot}.
 	 */
-	public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+	@Deprecated
+	public static class BufferSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<BufferEntry<T>> {
 
 		private static final int VERSION = 1;
 
@@ -512,6 +493,54 @@ public class IntervalJoinOperator<K, T1, T2, OUT>
 		public int getVersion() {
 			return VERSION;
 		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<BufferEntry<T>> resolveSchemaCompatibility(TypeSerializer<BufferEntry<T>> newSerializer) {
+
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new BufferEntrySerializerSnapshot<>(),
+				getSingleNestedSerializerAndConfig().f1);
+		}
+	}
+
+	/**
+	 * A {@link TypeSerializerSnapshot} for {@link BufferEntrySerializer}.
+	 */
+	public static final class BufferEntrySerializerSnapshot<T>
+		extends CompositeTypeSerializerSnapshot<BufferEntry<T>, BufferEntrySerializer<T>> {
+
+		private static final int VERSION = 2;
+
+		@SuppressWarnings({"unused", "WeakerAccess"})
+		public BufferEntrySerializerSnapshot() {
+			super(correspondingSerializerClass());
+		}
+
+		BufferEntrySerializerSnapshot(BufferEntrySerializer<T> serializerInstance) {
+			super(serializerInstance);
+		}
+
+		@Override
+		protected int getCurrentOuterSnapshotVersion() {
+			return VERSION;
+		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(BufferEntrySerializer<T> outerSerializer) {
+			return new TypeSerializer[]{outerSerializer.elementSerializer};
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		protected BufferEntrySerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			return new BufferEntrySerializer<>((TypeSerializer<T>) nestedSerializers[0]);
+		}
+
+		@SuppressWarnings("unchecked")
+		private static <T> Class<BufferEntrySerializer<T>> correspondingSerializerClass() {
+			return (Class<BufferEntrySerializer<T>>) (Class<?>) BufferEntrySerializer.class;
+		}
 	}
 
 	@VisibleForTesting
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java
new file mode 100644
index 0000000..d4d2673
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntry;
+import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntrySerializer;
+import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntrySerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * State migration tests for {@link BufferEntrySerializer}.
+ */
+@RunWith(Parameterized.class)
+public class BufferEntrySerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<BufferEntry<String>> {
+
+	public BufferEntrySerializerMigrationTest(TestSpecification<BufferEntry<String>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"buffer-entry-serializer",
+			BufferEntrySerializer.class,
+			BufferEntrySerializerSnapshot.class,
+			() -> new BufferEntrySerializer<>(StringSerializer.INSTANCE));
+
+		return testSpecifications.get();
+	}
+}
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data
new file mode 100644
index 0000000..a4af1fc
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot
new file mode 100644
index 0000000..6141180
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data
new file mode 100644
index 0000000..36c9dc7
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot
new file mode 100644
index 0000000..af92e1b
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot differ


[flink] 05/18: [hotfix] [e2e] Add missing serialVersionUIDs to state TTL e2e test classes

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d575951cff8c24c8a5ceb9e4dd44c9c0429aaee8
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jan 30 16:20:56 2019 +0800

    [hotfix] [e2e] Add missing serialVersionUIDs to state TTL e2e test classes
---
 .../org/apache/flink/streaming/tests/verify/ValueWithTs.java     | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
index 7070c6d..3574591 100644
--- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
@@ -30,6 +30,9 @@ import java.io.Serializable;
 
 /** User state value with timestamps before and after update. */
 public class ValueWithTs<V> implements Serializable {
+
+	private static final long serialVersionUID = -8941625260587401383L;
+
 	private final V value;
 	private final long timestamp;
 
@@ -57,6 +60,8 @@ public class ValueWithTs<V> implements Serializable {
 	/** Serializer for Serializer. */
 	public static class Serializer extends CompositeSerializer<ValueWithTs<?>> {
 
+		private static final long serialVersionUID = -7300352863212438745L;
+
 		public Serializer(TypeSerializer<?> valueSerializer, TypeSerializer<Long> timestampSerializer) {
 			super(true, valueSerializer, timestampSerializer);
 		}
@@ -143,8 +148,8 @@ public class ValueWithTs<V> implements Serializable {
 		@Override
 		protected Serializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
 			TypeSerializer<?> valueSerializer = nestedSerializers[0];
-			TypeSerializer<Long> timeSerializer = (TypeSerializer<Long>) nestedSerializers[1];
-			return new Serializer(valueSerializer, timeSerializer);
+			TypeSerializer<Long> timestampSerializer = (TypeSerializer<Long>) nestedSerializers[1];
+			return new Serializer(valueSerializer, timestampSerializer);
 		}
 	}
 }


[flink] 17/18: [hotfix] [javadocs] Fix typo in RpcService

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e0aa1518bb875653c018a2d7d2b0bd3b2d090252
Author: libenchao <li...@gmail.com>
AuthorDate: Mon Jan 28 16:07:38 2019 +0800

    [hotfix] [javadocs] Fix typo in RpcService
    
    This closes #7585.
---
 .../src/main/java/org/apache/flink/runtime/rpc/RpcService.java          | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 9aa3119..ff4cc5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -67,7 +67,7 @@ public interface RpcService {
 		Class<C> clazz);
 
 	/**
-	 * Connect to ta remote fenced rpc server under the provided address. Returns a fenced rpc gateway
+	 * Connect to a remote fenced rpc server under the provided address. Returns a fenced rpc gateway
 	 * which can be used to communicate with the rpc server. If the connection failed, then the
 	 * returned future is failed with a {@link RpcConnectionException}.
 	 *


[flink] 15/18: [FLINK-11322] [kafka] Use try-with-resource for short-living connections in Kafka connectors

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 64cbf2a20178cf8b4f643ba510653881a3e5ce2f
Author: Fokko Driesprong <fo...@godatadriven.com>
AuthorDate: Tue Jan 15 00:41:13 2019 +0100

    [FLINK-11322] [kafka] Use try-with-resource for short-living connections in Kafka connectors
    
    This closes #7488.
---
 .../connectors/kafka/FlinkKafkaConsumer010.java    | 16 ++++++------
 .../streaming/connectors/kafka/Kafka08ITCase.java  | 29 +++++++++++-----------
 .../connectors/kafka/FlinkKafkaConsumer.java       | 17 ++++++-------
 3 files changed, 31 insertions(+), 31 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 3508d6d..0831516 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -243,20 +243,20 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 				timestamp);
 		}
 
+		final Map<KafkaTopicPartition, Long> result = new HashMap<>(partitions.size());
+
 		// use a short-lived consumer to fetch the offsets;
 		// this is ok because this is a one-time operation that happens only on startup
-		KafkaConsumer<?, ?> consumer = new KafkaConsumer(properties);
-
-		Map<KafkaTopicPartition, Long> result = new HashMap<>(partitions.size());
-		for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
+		try (KafkaConsumer<?, ?> consumer = new KafkaConsumer(properties)) {
+			for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
 				consumer.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
 
-			result.put(
-				new KafkaTopicPartition(partitionToOffset.getKey().topic(), partitionToOffset.getKey().partition()),
-				(partitionToOffset.getValue() == null) ? null : partitionToOffset.getValue().offset());
+				result.put(
+					new KafkaTopicPartition(partitionToOffset.getKey().topic(), partitionToOffset.getKey().partition()),
+					(partitionToOffset.getValue() == null) ? null : partitionToOffset.getValue().offset());
+			}
 		}
 
-		consumer.close();
 		return result;
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 5af219e..a250d86 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -83,9 +83,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		final String topic = writeSequence("invalidOffsetTopic", 20, parallelism, 1);
 
 		// set invalid offset:
-		CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl) kafkaServer).createCuratorClient();
-		ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topic, 0, 1234);
-		curatorClient.close();
+		try (CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl) kafkaServer).createCuratorClient()) {
+			ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorClient, standardProps.getProperty("group.id"), topic, 0, 1234);
+		}
 
 		// read from topic
 		final int valuesCount = 20;
@@ -165,14 +165,14 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
 			final Long offset = (long) (Math.random() * Long.MAX_VALUE);
 
-			CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl) kafkaServer).createCuratorClient();
-			kafkaServer.createTestTopic(topicName, 3, 2);
-
-			ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset);
+			final Long fetchedOffset;
+			try (CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl) kafkaServer).createCuratorClient()) {
+				kafkaServer.createTestTopic(topicName, 3, 2);
 
-			Long fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0);
+				ZookeeperOffsetHandler.setOffsetInZooKeeper(curatorFramework, groupId, topicName, 0, offset);
 
-			curatorFramework.close();
+				fetchedOffset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, groupId, topicName, 0);
+			}
 
 			assertEquals(offset, fetchedOffset);
 		}
@@ -209,13 +209,14 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		// read so that the offset can be committed to ZK
 		readSequence(env, StartupMode.GROUP_OFFSETS, null, null, readProps, parallelism, topicName, 100, 0);
 
+		final Long o1, o2, o3;
 		// get the offset
-		CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl) kafkaServer).createCuratorClient();
+		try (CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl) kafkaServer).createCuratorClient()) {
+			o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0);
+			o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1);
+			o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2);
+		}
 
-		Long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 0);
-		Long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 1);
-		Long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"), topicName, 2);
-		curatorFramework.close();
 		LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
 
 		// ensure that the offset has been committed
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index ddbe421..e6f1638 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -274,20 +274,19 @@ public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
 				timestamp);
 		}
 
+		final Map<KafkaTopicPartition, Long> result = new HashMap<>(partitions.size());
 		// use a short-lived consumer to fetch the offsets;
 		// this is ok because this is a one-time operation that happens only on startup
-		KafkaConsumer<?, ?> consumer = new KafkaConsumer(properties);
+		try (KafkaConsumer<?, ?> consumer = new KafkaConsumer(properties)) {
+			for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
+				consumer.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
 
-		Map<KafkaTopicPartition, Long> result = new HashMap<>(partitions.size());
-		for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
-			consumer.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
+				result.put(
+					new KafkaTopicPartition(partitionToOffset.getKey().topic(), partitionToOffset.getKey().partition()),
+					(partitionToOffset.getValue() == null) ? null : partitionToOffset.getValue().offset());
+			}
 
-			result.put(
-				new KafkaTopicPartition(partitionToOffset.getKey().topic(), partitionToOffset.getKey().partition()),
-				(partitionToOffset.getValue() == null) ? null : partitionToOffset.getValue().offset());
 		}
-
-		consumer.close();
 		return result;
 	}
 


[flink] 16/18: [hotfix] [docs] Fix typo in best_practices

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8f0cfdf1b4f24134dcebf6efe6729895da408b1c
Author: Tony Feng <33...@qq.com>
AuthorDate: Tue Jan 29 22:14:37 2019 +0800

    [hotfix] [docs] Fix typo in best_practices
    
    This closes #7594.
---
 docs/dev/best_practices.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/dev/best_practices.md b/docs/dev/best_practices.md
index daf4aaf..6bd564b 100644
--- a/docs/dev/best_practices.md
+++ b/docs/dev/best_practices.md
@@ -40,7 +40,7 @@ Please note that you don't have to use the `ParameterTool` described here. Other
 
 ### Getting your configuration values into the `ParameterTool`
 
-The `ParameterTool` provides a set of predefined static methods for reading the configuration. The tool is internally expecting a `Map<String, String>`, so its very easy to integrate it with your own configuration style.
+The `ParameterTool` provides a set of predefined static methods for reading the configuration. The tool is internally expecting a `Map<String, String>`, so it's very easy to integrate it with your own configuration style.
 
 
 #### From `.properties` files


[flink] 18/18: [hotfix] [rocksdb] Fix typo in RocksDB state migration exception message

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8b72b1f338469faf90036b82c93fd6e381fd2699
Author: Cristian <me...@cristian.io>
AuthorDate: Tue Jan 22 13:12:42 2019 -0800

    [hotfix] [rocksdb] Fix typo in RocksDB state migration exception message
    
    This closes #7558.
---
 .../org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 4ac912c..d915da9 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -210,7 +210,7 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K
 			V value = priorSerializer.deserialize(serializedOldValueInput);
 			newSerializer.serialize(value, serializedMigratedValueOutput);
 		} catch (Exception e) {
-			throw new StateMigrationException("Error while trying to migration RocksDB state.", e);
+			throw new StateMigrationException("Error while trying to migrate RocksDB state.", e);
 		}
 	}
 


[flink] 01/18: [hotfix][cep] Close nfa in CepOperator only if not null

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3bfffbf1773ca2edebffd826fb2bb44b3d38e065
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Jan 22 09:47:23 2019 +0100

    [hotfix][cep] Close nfa in CepOperator only if not null
---
 .../src/main/java/org/apache/flink/cep/operator/CepOperator.java      | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index 357fbc7..8cc84c9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -226,7 +226,9 @@ public class CepOperator<IN, KEY, OUT>
 	@Override
 	public void close() throws Exception {
 		super.close();
-		nfa.close();
+		if (nfa != null) {
+			nfa.close();
+		}
 	}
 
 	@Override


[flink] 11/18: [FLINK-11329] [scala] Migrating ScalaOptionSerializer to use new compatibility API

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a34d46344dccfb7ba34e00337e7c0270cf09e416
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Mon Jan 28 16:56:50 2019 +0100

    [FLINK-11329] [scala] Migrating ScalaOptionSerializer to use new compatibility API
---
 .../ScalaOptionSerializerConfigSnapshot.java       |  11 ++++
 .../typeutils/ScalaOptionSerializerSnapshot.java   |  62 +++++++++++++++++++++
 .../api/scala/typeutils/OptionSerializer.scala     |  41 +-------------
 ...ScalaOptionSerializerSnapshotMigrationTest.java |  58 +++++++++++++++++++
 .../flink-1.6-scala-option-serializer-data         | Bin 0 -> 31 bytes
 .../flink-1.6-scala-option-serializer-snapshot     | Bin 0 -> 876 bytes
 .../flink-1.7-scala-option-serializer-data         | Bin 0 -> 31 bytes
 .../flink-1.7-scala-option-serializer-snapshot     | Bin 0 -> 877 bytes
 8 files changed, 133 insertions(+), 39 deletions(-)

diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
index 215bd44..e6bc88c 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
@@ -19,8 +19,10 @@
 package org.apache.flink.api.scala.typeutils;
 
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 
 import scala.Option;
 
@@ -31,6 +33,7 @@ import scala.Option;
  * allow calling different base class constructors from subclasses, while we need that
  * for the default empty constructor.
  */
+@Deprecated
 public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot<Option<E>> {
 
 	private static final int VERSION = 1;
@@ -46,4 +49,12 @@ public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeS
 	public int getVersion() {
 		return VERSION;
 	}
+
+	@Override
+	public TypeSerializerSchemaCompatibility<Option<E>> resolveSchemaCompatibility(TypeSerializer<Option<E>> newSerializer) {
+		return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+			newSerializer,
+			new ScalaOptionSerializerSnapshot<>(),
+			getSingleNestedSerializerAndConfig().f1);
+	}
 }
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
new file mode 100644
index 0000000..dfa9178
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import scala.Option;
+
+/**
+ * A {@link org.apache.flink.api.common.typeutils.TypeSerializerSnapshot} for the Scala {@link OptionSerializer}.
+ */
+public final class ScalaOptionSerializerSnapshot<E> extends CompositeTypeSerializerSnapshot<Option<E>, OptionSerializer<E>> {
+
+	private static final int VERSION = 2;
+
+	@SuppressWarnings("WeakerAccess")
+	public ScalaOptionSerializerSnapshot() {
+		super(underlyingClass());
+	}
+
+	public ScalaOptionSerializerSnapshot(OptionSerializer<E> serializerInstance) {
+		super(serializerInstance);
+	}
+
+	@Override
+	protected int getCurrentOuterSnapshotVersion() {
+		return VERSION;
+	}
+
+	@Override
+	protected TypeSerializer<?>[] getNestedSerializers(OptionSerializer<E> outerSerializer) {
+		return new TypeSerializer[]{outerSerializer.elemSerializer()};
+	}
+
+	@Override
+	protected OptionSerializer<E> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked") TypeSerializer<E> nestedSerializer = (TypeSerializer<E>) nestedSerializers[0];
+		return new OptionSerializer<>(nestedSerializer);
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <E> Class<OptionSerializer<E>> underlyingClass() {
+		return (Class<OptionSerializer<E>>) (Class<?>) OptionSerializer.class;
+	}
+}
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 7f3aa8c..ea8f22a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -101,45 +101,8 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
   // Serializer configuration snapshotting & compatibility
   // --------------------------------------------------------------------------------------------
 
-  override def snapshotConfiguration(): ScalaOptionSerializerConfigSnapshot[A] = {
-    new ScalaOptionSerializerConfigSnapshot[A](elemSerializer)
-  }
-
-  override def ensureCompatibility(
-      configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Option[A]] = {
-
-    configSnapshot match {
-      case optionSerializerConfigSnapshot
-          : ScalaOptionSerializerConfigSnapshot[A] =>
-        ensureCompatibilityInternal(optionSerializerConfigSnapshot)
-      case legacyOptionSerializerConfigSnapshot
-          : OptionSerializer.OptionSerializerConfigSnapshot[A] =>
-        ensureCompatibilityInternal(legacyOptionSerializerConfigSnapshot)
-      case _ => CompatibilityResult.requiresMigration()
-    }
-  }
-
-  private def ensureCompatibilityInternal(
-      compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot[Option[A]])
-      : CompatibilityResult[Option[A]] = {
-
-    val compatResult = CompatibilityUtil.resolveCompatibilityResult(
-      compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
-      classOf[UnloadableDummyTypeSerializer[_]],
-      compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
-      elemSerializer)
-
-    if (compatResult.isRequiresMigration) {
-      if (compatResult.getConvertDeserializer != null) {
-        CompatibilityResult.requiresMigration(
-          new OptionSerializer[A](
-            new TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
-      } else {
-        CompatibilityResult.requiresMigration()
-      }
-    } else {
-      CompatibilityResult.compatible()
-    }
+  override def snapshotConfiguration(): TypeSerializerSnapshot[Option[A]] = {
+    new ScalaOptionSerializerSnapshot[A](this)
   }
 }
 
diff --git a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java
new file mode 100644
index 0000000..efc94ec
--- /dev/null
+++ b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+import scala.Option;
+
+/**
+ * Migration test for the {@link ScalaEitherSerializerSnapshot}.
+ */
+@RunWith(Parameterized.class)
+public class ScalaOptionSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Option<String>> {
+
+	private static final String SPEC_NAME = "scala-option-serializer";
+
+	public ScalaOptionSerializerSnapshotMigrationTest(TestSpecification<Option<String>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			SPEC_NAME,
+			OptionSerializer.class,
+			ScalaOptionSerializerSnapshot.class,
+			() -> new OptionSerializer<>(StringSerializer.INSTANCE));
+
+		return testSpecifications.get();
+	}
+}
diff --git a/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data
new file mode 100644
index 0000000..3cdb252
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data differ
diff --git a/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot
new file mode 100644
index 0000000..d7be0c2
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot differ
diff --git a/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data
new file mode 100644
index 0000000..3cdb252
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data differ
diff --git a/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot
new file mode 100644
index 0000000..bdedf0e
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot differ


[flink] 08/18: [FLINK-11329] [DataStream] Migrating the UnionSerializer to use new compatibility API

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 392f2cd57380f9070414f4d6d120e4b051d4e061
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Sun Jan 27 16:40:40 2019 +0100

    [FLINK-11329] [DataStream] Migrating the UnionSerializer to use new compatibility API
---
 .../streaming/api/datastream/CoGroupedStreams.java | 110 +++++++++++++--------
 .../datastream/UnionSerializerMigrationTest.java   |  67 +++++++++++++
 .../test/resources/flink-1.6-union-serializer-data |   1 +
 .../resources/flink-1.6-union-serializer-snapshot  | Bin 0 -> 1393 bytes
 .../test/resources/flink-1.7-union-serializer-data |   1 +
 .../resources/flink-1.7-union-serializer-snapshot  | Bin 0 -> 1403 bytes
 6 files changed, 138 insertions(+), 41 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 19d9783..3ef1861 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -25,14 +25,13 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -504,7 +503,9 @@ public class CoGroupedStreams<T1, T2> {
 		}
 	}
 
-	private static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> {
+	@VisibleForTesting
+	@Internal
+	static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> {
 		private static final long serialVersionUID = 1L;
 
 		private final TypeSerializer<T1> oneSerializer;
@@ -618,63 +619,90 @@ public class CoGroupedStreams<T1, T2> {
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot<TaggedUnion<T1, T2>> snapshotConfiguration() {
-			return new UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer);
-		}
-
-		@Override
-		public CompatibilityResult<TaggedUnion<T1, T2>> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-			if (configSnapshot instanceof UnionSerializerConfigSnapshot) {
-				List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousSerializersAndConfigs =
-					((UnionSerializerConfigSnapshot<?, ?>) configSnapshot).getNestedSerializersAndConfigs();
-
-				CompatibilityResult<T1> oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-					previousSerializersAndConfigs.get(0).f0,
-					UnloadableDummyTypeSerializer.class,
-					previousSerializersAndConfigs.get(0).f1,
-					oneSerializer);
-
-				CompatibilityResult<T2> twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-					previousSerializersAndConfigs.get(1).f0,
-					UnloadableDummyTypeSerializer.class,
-					previousSerializersAndConfigs.get(1).f1,
-					twoSerializer);
-
-				if (!oneSerializerCompatResult.isRequiresMigration() && !twoSerializerCompatResult.isRequiresMigration()) {
-					return CompatibilityResult.compatible();
-				} else if (oneSerializerCompatResult.getConvertDeserializer() != null && twoSerializerCompatResult.getConvertDeserializer() != null) {
-					return CompatibilityResult.requiresMigration(
-						new UnionSerializer<>(
-							new TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()),
-							new TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer())));
-				}
-			}
-
-			return CompatibilityResult.requiresMigration();
+		public TypeSerializerSnapshot<TaggedUnion<T1, T2>> snapshotConfiguration() {
+			return new UnionSerializerSnapshot<>(this);
 		}
 	}
 
 	/**
 	 * The {@link TypeSerializerConfigSnapshot} for the {@link UnionSerializer}.
+	 *
+	 * @deprecated this snapshot class is no longer in use, and is maintained only for backwards compatibility.
+	 *             It is fully replaced by {@link UnionSerializerSnapshot}.
 	 */
+	@Deprecated
 	public static class UnionSerializerConfigSnapshot<T1, T2>
-			extends CompositeTypeSerializerConfigSnapshot<TaggedUnion<T1, T2>> {
+		extends CompositeTypeSerializerConfigSnapshot<TaggedUnion<T1, T2>> {
 
 		private static final int VERSION = 1;
 
-		/** This empty nullary constructor is required for deserializing the configuration. */
-		public UnionSerializerConfigSnapshot() {}
+		/**
+		 * This empty nullary constructor is required for deserializing the configuration.
+		 */
+		public UnionSerializerConfigSnapshot() {
+		}
 
 		public UnionSerializerConfigSnapshot(TypeSerializer<T1> oneSerializer, TypeSerializer<T2> twoSerializer) {
 			super(oneSerializer, twoSerializer);
 		}
 
 		@Override
+		public TypeSerializerSchemaCompatibility<TaggedUnion<T1, T2>> resolveSchemaCompatibility(TypeSerializer<TaggedUnion<T1, T2>> newSerializer) {
+			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs();
+
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new UnionSerializerSnapshot<>(),
+				nestedSerializersAndConfigs.get(0).f1,
+				nestedSerializersAndConfigs.get(1).f1
+			);
+		}
+
+		@Override
 		public int getVersion() {
 			return VERSION;
 		}
 	}
 
+	/**
+	 * The {@link TypeSerializerSnapshot} for the {@link UnionSerializer}.
+	 */
+	public static class UnionSerializerSnapshot<T1, T2>
+		extends CompositeTypeSerializerSnapshot<TaggedUnion<T1, T2>, UnionSerializer<T1, T2>> {
+
+		private static final int VERSION = 2;
+
+		@SuppressWarnings("WeakerAccess")
+		public UnionSerializerSnapshot() {
+			super(correspondingSerializerClass());
+		}
+
+		UnionSerializerSnapshot(UnionSerializer<T1, T2> serializerInstance) {
+			super(serializerInstance);
+		}
+
+		@Override
+		protected int getCurrentOuterSnapshotVersion() {
+			return VERSION;
+		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(UnionSerializer<T1, T2> outerSerializer) {
+			return new TypeSerializer[]{outerSerializer.oneSerializer, outerSerializer.twoSerializer};
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		protected UnionSerializer<T1, T2> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			return new UnionSerializer<>((TypeSerializer<T1>) nestedSerializers[0], (TypeSerializer<T2>) nestedSerializers[1]);
+		}
+
+		@SuppressWarnings("unchecked")
+		private static <T1, T2> Class<UnionSerializer<T1, T2>> correspondingSerializerClass() {
+			return (Class<UnionSerializer<T1, T2>>) (Class<?>) UnionSerializer.class;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utility functions that implement the CoGroup logic based on the tagged
 	//  union window reduce
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java
new file mode 100644
index 0000000..2b85d7e
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializer;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * State migration tests for {@link UnionSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class UnionSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<TaggedUnion<String, Long>> {
+
+	public UnionSerializerMigrationTest(TestSpecification<TaggedUnion<String, Long>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"union-serializer",
+			UnionSerializer.class,
+			UnionSerializerSnapshot.class,
+			UnionSerializerMigrationTest::stringLongRowSupplier);
+
+		return testSpecifications.get();
+	}
+
+	private static TypeSerializer<TaggedUnion<String, Long>> stringLongRowSupplier() {
+		return new UnionSerializer<>(StringSerializer.INSTANCE, LongSerializer.INSTANCE);
+	}
+
+}
+
+
+
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data
new file mode 100644
index 0000000..cb29a99
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data
@@ -0,0 +1 @@
+53778725243338537787315927955377873178348653778731961974537787321387515377873232740853778732510096537787327007985377873288617553778733069270
\ No newline at end of file
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot
new file mode 100644
index 0000000..178d007
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data
new file mode 100644
index 0000000..d2f04fd
--- /dev/null
+++ b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data
@@ -0,0 +1 @@
+53711258937562537112614539985371126164162753711261837231537112620123325371126218543153711262413321537112625282785371126263382753711262735393
\ No newline at end of file
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot
new file mode 100644
index 0000000..747eb82
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot differ


[flink] 03/18: [FLINK-11328][core] Migrate NFAStateSerializer to new TypeSerializer interface

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4e607e91db6d46ca8cad360dd761cd2eeb56d733
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jan 23 10:39:17 2019 +0100

    [FLINK-11328][core] Migrate NFAStateSerializer to new TypeSerializer interface
    
    This closes #7566.
---
 .../apache/flink/cep/nfa/NFAStateSerializer.java   | 101 ++++++++++++++-------
 .../flink/cep/nfa/NFAStateSerializerSnapshot.java  |  75 +++++++++++++++
 .../org/apache/flink/cep/operator/CepOperator.java |   6 +-
 .../cep/NFASerializerSnapshotsMigrationTest.java   |   8 +-
 .../java/org/apache/flink/cep/nfa/NFATest.java     |   2 +-
 .../resources/flink-1.6-nfa-state-serializer-data  | Bin 0 -> 1460 bytes
 .../flink-1.6-nfa-state-serializer-snapshot        | Bin 0 -> 362 bytes
 .../resources/flink-1.7-nfa-state-serializer-data  | Bin 0 -> 1460 bytes
 .../flink-1.7-nfa-state-serializer-snapshot        | Bin 0 -> 350 bytes
 9 files changed, 152 insertions(+), 40 deletions(-)

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
index 7d79a36..0c19036 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.nfa;
 
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
@@ -33,21 +32,42 @@ import org.apache.flink.types.StringValue;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.util.PriorityQueue;
 import java.util.Queue;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A {@link TypeSerializer} for {@link NFAState} that uses Java Serialization.
+ * A {@link TypeSerializer} for {@link NFAState}.
  */
 public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 
 	private static final long serialVersionUID = 2098282423980597010L;
 
-	public static final NFAStateSerializer INSTANCE = new NFAStateSerializer();
+	/**
+	 * NOTE: this field should actually be final.
+	 * The reason that it isn't final is due to backward compatible deserialization
+	 * paths. See {@link #readObject(ObjectInputStream)}.
+	 */
+	private TypeSerializer<DeweyNumber> versionSerializer;
+	private TypeSerializer<NodeId> nodeIdSerializer;
+	private TypeSerializer<EventId> eventIdSerializer;
+
+	public NFAStateSerializer() {
+		this.versionSerializer = DeweyNumber.DeweyNumberSerializer.INSTANCE;
+		this.eventIdSerializer = EventId.EventIdSerializer.INSTANCE;
+		this.nodeIdSerializer = new NodeId.NodeIdSerializer();
+	}
 
-	private static final DeweyNumber.DeweyNumberSerializer VERSION_SERIALIZER = DeweyNumber.DeweyNumberSerializer.INSTANCE;
-	private static final NodeId.NodeIdSerializer NODE_ID_SERIALIZER = new NodeId.NodeIdSerializer();
-	private static final EventId.EventIdSerializer EVENT_ID_SERIALIZER = EventId.EventIdSerializer.INSTANCE;
+	NFAStateSerializer(
+			final TypeSerializer<DeweyNumber> versionSerializer,
+			final TypeSerializer<NodeId> nodeIdSerializer,
+			final TypeSerializer<EventId> eventIdSerializer) {
+		this.versionSerializer = checkNotNull(versionSerializer);
+		this.nodeIdSerializer = checkNotNull(nodeIdSerializer);
+		this.eventIdSerializer = checkNotNull(eventIdSerializer);
+	}
 
 	@Override
 	public boolean isImmutableType() {
@@ -55,11 +75,6 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 	}
 
 	@Override
-	public NFAStateSerializer duplicate() {
-		return new NFAStateSerializer();
-	}
-
-	@Override
 	public NFAState createInstance() {
 		return null;
 	}
@@ -130,24 +145,25 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 		return true;
 	}
 
-	// -----------------------------------------------------------------------------------
-
 	@Override
 	public TypeSerializerSnapshot<NFAState> snapshotConfiguration() {
-		return new NFAStateSerializerSnapshot();
+		return new NFAStateSerializerSnapshot(this);
 	}
 
-	private NFAStateSerializer() {
+	/*
+		Getters for internal serializers to use in NFAStateSerializerSnapshot.
+	 */
+
+	TypeSerializer<DeweyNumber> getVersionSerializer() {
+		return versionSerializer;
 	}
 
-	/**
-	 * Serializer configuration snapshot for compatibility and format evolution.
-	 */
-	@SuppressWarnings("WeakerAccess")
-	public static final class NFAStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<NFAState> {
-		public NFAStateSerializerSnapshot() {
-			super(() -> INSTANCE);
-		}
+	TypeSerializer<NodeId> getNodeIdSerializer() {
+		return nodeIdSerializer;
+	}
+
+	TypeSerializer<EventId> getEventIdSerializer() {
+		return eventIdSerializer;
 	}
 
 	/*
@@ -177,16 +193,16 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 			DataOutputView target) throws IOException {
 
 		StringValue.writeString(computationState.getCurrentStateName(), target);
-		NODE_ID_SERIALIZER.serialize(computationState.getPreviousBufferEntry(), target);
-		VERSION_SERIALIZER.serialize(computationState.getVersion(), target);
+		nodeIdSerializer.serialize(computationState.getPreviousBufferEntry(), target);
+		versionSerializer.serialize(computationState.getVersion(), target);
 		target.writeLong(computationState.getStartTimestamp());
 		serializeStartEvent(computationState.getStartEventID(), target);
 	}
 
 	private ComputationState deserializeSingleComputationState(DataInputView source) throws IOException {
 		String stateName = StringValue.readString(source);
-		NodeId prevState = NODE_ID_SERIALIZER.deserialize(source);
-		DeweyNumber version = VERSION_SERIALIZER.deserialize(source);
+		NodeId prevState = nodeIdSerializer.deserialize(source);
+		DeweyNumber version = versionSerializer.deserialize(source);
 		long startTimestamp = source.readLong();
 
 		EventId startEventId = deserializeStartEvent(source);
@@ -200,10 +216,10 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 
 	private void copySingleComputationState(DataInputView source, DataOutputView target) throws IOException {
 		StringValue.copyString(source, target);
-		NodeId prevState = NODE_ID_SERIALIZER.deserialize(source);
-		NODE_ID_SERIALIZER.serialize(prevState, target);
-		DeweyNumber version = VERSION_SERIALIZER.deserialize(source);
-		VERSION_SERIALIZER.serialize(version, target);
+		NodeId prevState = nodeIdSerializer.deserialize(source);
+		nodeIdSerializer.serialize(prevState, target);
+		DeweyNumber version = versionSerializer.deserialize(source);
+		versionSerializer.serialize(version, target);
 		long startTimestamp = source.readLong();
 		target.writeLong(startTimestamp);
 
@@ -213,7 +229,7 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 	private void serializeStartEvent(EventId startEventID, DataOutputView target) throws IOException {
 		if (startEventID != null) {
 			target.writeByte(1);
-			EVENT_ID_SERIALIZER.serialize(startEventID, target);
+			eventIdSerializer.serialize(startEventID, target);
 		} else {
 			target.writeByte(0);
 		}
@@ -223,7 +239,7 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 		byte isNull = source.readByte();
 		EventId startEventId = null;
 		if (isNull == 1) {
-			startEventId = EVENT_ID_SERIALIZER.deserialize(source);
+			startEventId = eventIdSerializer.deserialize(source);
 		}
 		return startEventId;
 	}
@@ -233,8 +249,23 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 		target.writeByte(isNull);
 
 		if (isNull == 1) {
-			EventId startEventId = EVENT_ID_SERIALIZER.deserialize(source);
-			EVENT_ID_SERIALIZER.serialize(startEventId, target);
+			EventId startEventId = eventIdSerializer.deserialize(source);
+			eventIdSerializer.serialize(startEventId, target);
+		}
+	}
+
+	/*
+	* Backwards compatible deserializing of NFAStateSerializer.
+	*/
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+
+		// the nested serializer will be null if this was read from a savepoint taken with versions
+		// lower than Flink 1.7; in this case, we explicitly create instance for the nested serializer.
+		if (versionSerializer == null || nodeIdSerializer == null || eventIdSerializer == null) {
+			this.versionSerializer = DeweyNumber.DeweyNumberSerializer.INSTANCE;
+			this.eventIdSerializer = EventId.EventIdSerializer.INSTANCE;
+			this.nodeIdSerializer = new NodeId.NodeIdSerializer();
 		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java
new file mode 100644
index 0000000..e58cbea
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializerSnapshot.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
+
+/**
+ * Snapshot class for {@link NFAStateSerializer}.
+ */
+public class NFAStateSerializerSnapshot extends CompositeTypeSerializerSnapshot<NFAState, NFAStateSerializer> {
+
+	private static final int CURRENT_VERSION = 1;
+
+	/**
+	 * Constructor for read instantiation.
+	 */
+	public NFAStateSerializerSnapshot() {
+		super(NFAStateSerializer.class);
+	}
+
+	/**
+	 * Constructor to create the snapshot for writing.
+	 */
+	public NFAStateSerializerSnapshot(NFAStateSerializer serializerInstance) {
+		super(serializerInstance);
+	}
+
+	@Override
+	protected int getCurrentOuterSnapshotVersion() {
+		return CURRENT_VERSION;
+	}
+
+	@Override
+	protected TypeSerializer<?>[] getNestedSerializers(NFAStateSerializer outerSerializer) {
+		TypeSerializer<DeweyNumber> versionSerializer = outerSerializer.getVersionSerializer();
+		TypeSerializer<NodeId> nodeIdSerializer = outerSerializer.getNodeIdSerializer();
+		TypeSerializer<EventId> eventIdSerializer = outerSerializer.getEventIdSerializer();
+
+		return new TypeSerializer[]{versionSerializer, nodeIdSerializer, eventIdSerializer};
+	}
+
+	@Override
+	protected NFAStateSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+
+		@SuppressWarnings("unchecked")
+		TypeSerializer<DeweyNumber> versionSerializer = (TypeSerializer<DeweyNumber>) nestedSerializers[0];
+
+		@SuppressWarnings("unchecked")
+		TypeSerializer<NodeId> nodeIdSerializer = (TypeSerializer<NodeId>) nestedSerializers[1];
+
+		@SuppressWarnings("unchecked")
+		TypeSerializer<EventId> eventIdSerializer = (TypeSerializer<EventId>) nestedSerializers[2];
+
+		return new NFAStateSerializer(versionSerializer, nodeIdSerializer, eventIdSerializer);
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index 8cc84c9..ca74e12 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -171,9 +171,9 @@ public class CepOperator<IN, KEY, OUT>
 
 		// initializeState through the provided context
 		computationStates = context.getKeyedStateStore().getState(
-				new ValueStateDescriptor<>(
-						NFA_STATE_NAME,
-						NFAStateSerializer.INSTANCE));
+			new ValueStateDescriptor<>(
+				NFA_STATE_NAME,
+				new NFAStateSerializer()));
 
 		partialMatches = new SharedBuffer<>(context.getKeyedStateStore(), inputSerializer);
 
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerSnapshotsMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerSnapshotsMigrationTest.java
index 0d8d999..b5074b2 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerSnapshotsMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerSnapshotsMigrationTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.cep;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.cep.nfa.NFAStateSerializer;
+import org.apache.flink.cep.nfa.NFAStateSerializerSnapshot;
 import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge;
@@ -41,7 +43,6 @@ public class NFASerializerSnapshotsMigrationTest extends TypeSerializerSnapshotM
 		super(testSpecification);
 	}
 
-	@SuppressWarnings("unchecked")
 	@Parameterized.Parameters(name = "Test Specification = {0}")
 	public static Collection<TestSpecification<?>> testSpecifications() {
 
@@ -72,6 +73,11 @@ public class NFASerializerSnapshotsMigrationTest extends TypeSerializerSnapshotM
 			SharedBufferNode.SharedBufferNodeSerializer.class,
 			SharedBufferNode.SharedBufferNodeSerializer.SharedBufferNodeSerializerSnapshot.class,
 			SharedBufferNode.SharedBufferNodeSerializer::new);
+		testSpecifications.add(
+			"nfa-state-serializer",
+			NFAStateSerializer.class,
+			NFAStateSerializerSnapshot.class,
+			NFAStateSerializer::new);
 
 		return testSpecifications.get();
 	}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 638b8b5..afc4661 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -295,7 +295,7 @@ public class NFATest extends TestLogger {
 			nfaTestHarness.consumeRecord(new StreamRecord<>(d, 7));
 			nfaTestHarness.consumeRecord(new StreamRecord<>(a, 8));
 
-			NFAStateSerializer serializer = NFAStateSerializer.INSTANCE;
+			NFAStateSerializer serializer = new NFAStateSerializer();
 
 			//serialize
 			ByteArrayOutputStream baos = new ByteArrayOutputStream();
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.6-nfa-state-serializer-data b/flink-libraries/flink-cep/src/test/resources/flink-1.6-nfa-state-serializer-data
new file mode 100644
index 0000000..6505244
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.6-nfa-state-serializer-data differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.6-nfa-state-serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/flink-1.6-nfa-state-serializer-snapshot
new file mode 100644
index 0000000..6d5fa94
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.6-nfa-state-serializer-snapshot differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.7-nfa-state-serializer-data b/flink-libraries/flink-cep/src/test/resources/flink-1.7-nfa-state-serializer-data
new file mode 100644
index 0000000..6505244
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.7-nfa-state-serializer-data differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.7-nfa-state-serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/flink-1.7-nfa-state-serializer-snapshot
new file mode 100644
index 0000000..3f928a0
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.7-nfa-state-serializer-snapshot differ


[flink] 06/18: [FLINK-11329] [core] Migrating the NullableSerializer to use new compatibility API

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b22e91358fd2e58fc064968953a73218e87cfc88
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Sun Jan 27 15:27:50 2019 +0100

    [FLINK-11329] [core] Migrating the NullableSerializer to use new compatibility API
---
 .../java/typeutils/runtime/NullableSerializer.java | 179 +++++++++++++++------
 .../TypeSerializerSnapshotMigrationTestBase.java   |  63 +++++++-
 .../runtime/NullableSerializerMigrationTest.java   |  83 ++++++++++
 .../flink-1.6-nullable-not-padded-serializer-data  | Bin 0 -> 58 bytes
 ...ink-1.6-nullable-not-padded-serializer-snapshot | Bin 0 -> 944 bytes
 .../flink-1.6-nullable-padded-serializer-data      | Bin 0 -> 90 bytes
 .../flink-1.6-nullable-padded-serializer-snapshot  | Bin 0 -> 952 bytes
 .../flink-1.7-nullable-not-padded-serializer-data  | Bin 0 -> 58 bytes
 ...ink-1.7-nullable-not-padded-serializer-snapshot | Bin 0 -> 941 bytes
 .../flink-1.7-nullable-padded-serializer-data      | Bin 0 -> 90 bytes
 .../flink-1.7-nullable-padded-serializer-snapshot  | Bin 0 -> 949 bytes
 11 files changed, 270 insertions(+), 55 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
index fe392e4..05941fa 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
@@ -19,24 +19,23 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Serializer wrapper to add support of {@code null} value serialization.
@@ -65,12 +64,15 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 	private final byte[] padding;
 
 	private NullableSerializer(@Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) {
-		this.originalSerializer = originalSerializer;
-		this.padding = createPadding(originalSerializer.getLength(), padNullValueIfFixedLen);
+		this(originalSerializer, createPadding(originalSerializer.getLength(), padNullValueIfFixedLen));
+	}
 
+	private NullableSerializer(@Nonnull TypeSerializer<T> originalSerializer, byte[] padding) {
+		this.originalSerializer = originalSerializer;
+		this.padding = padding;
 	}
 
-	private static <T> byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) {
+	private static byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) {
 		boolean padNullValue = originalSerializerLength > 0 && padNullValueIfFixedLen;
 		return padNullValue ? new byte[originalSerializerLength] : EMPTY_BYTE_ARRAY;
 	}
@@ -79,7 +81,7 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 	 * This method tries to serialize {@code null} value with the {@code originalSerializer}
 	 * and wraps it in case of {@link NullPointerException}, otherwise it returns the {@code originalSerializer}.
 	 *
-	 * @param originalSerializer serializer to wrap and add {@code null} support
+	 * @param originalSerializer     serializer to wrap and add {@code null} support
 	 * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer
 	 * @return serializer which supports {@code null} values
 	 */
@@ -99,22 +101,24 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 		DataOutputSerializer dos = new DataOutputSerializer(length);
 		try {
 			serializer.serialize(null, dos);
-		} catch (IOException | RuntimeException e) {
+		}
+		catch (IOException | RuntimeException e) {
 			return false;
 		}
-		Preconditions.checkArgument(
+		checkArgument(
 			serializer.getLength() < 0 || serializer.getLength() == dos.getCopyOfBuffer().length,
 			"The serialized form of the null value should have the same length " +
 				"as any other if the length is fixed in the serializer");
 		DataInputDeserializer dis = new DataInputDeserializer(dos.getSharedBuffer());
 		try {
-			Preconditions.checkArgument(serializer.deserialize(dis) == null);
-		} catch (IOException e) {
+			checkArgument(serializer.deserialize(dis) == null);
+		}
+		catch (IOException e) {
 			throw new RuntimeException(
 				String.format("Unexpected failure to deserialize just serialized null value with %s",
 					serializer.getClass().getName()), e);
 		}
-		Preconditions.checkArgument(
+		checkArgument(
 			serializer.copy(null) == null,
 			"Serializer %s has to be able properly copy null value if it can serialize it",
 			serializer.getClass().getName());
@@ -125,10 +129,18 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 		return padding.length > 0;
 	}
 
+	private int nullPaddingLength() {
+		return padding.length;
+	}
+
+	private TypeSerializer<T> originalSerializer() {
+		return originalSerializer;
+	}
+
 	/**
 	 * This method wraps the {@code originalSerializer} with the {@code NullableSerializer} if not already wrapped.
 	 *
-	 * @param originalSerializer serializer to wrap and add {@code null} support
+	 * @param originalSerializer     serializer to wrap and add {@code null} support
 	 * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer
 	 * @return wrapped serializer which supports {@code null} values
 	 */
@@ -176,7 +188,8 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 		if (record == null) {
 			target.writeBoolean(true);
 			target.write(padding);
-		} else {
+		}
+		else {
 			target.writeBoolean(false);
 			originalSerializer.serialize(record, target);
 		}
@@ -209,7 +222,8 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 		target.writeBoolean(isNull);
 		if (isNull) {
 			target.write(padding);
-		} else {
+		}
+		else {
 			originalSerializer.copy(source, target);
 		}
 	}
@@ -233,45 +247,29 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public NullableSerializerConfigSnapshot<T> snapshotConfiguration() {
-		return new NullableSerializerConfigSnapshot<>(originalSerializer);
+	public TypeSerializerSnapshot<T> snapshotConfiguration() {
+		return new NullableSerializerSnapshot<>(this);
 	}
 
-	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-		if (configSnapshot instanceof NullableSerializerConfigSnapshot) {
-			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
-				((NullableSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
-
-			CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
-				previousKvSerializersAndConfigs.get(0).f0,
-				UnloadableDummyTypeSerializer.class,
-				previousKvSerializersAndConfigs.get(0).f1,
-				originalSerializer);
-
-			if (!compatResult.isRequiresMigration()) {
-				return CompatibilityResult.compatible();
-			} else if (compatResult.getConvertDeserializer() != null) {
-				return CompatibilityResult.requiresMigration(
-					new NullableSerializer<>(
-						new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), padNullValue()));
-			}
-		}
-
-		return CompatibilityResult.requiresMigration();
-	}
 
 	/**
 	 * Configuration snapshot for serializers of nullable types, containing the
 	 * configuration snapshot of its original serializer.
+	 *
+	 * @deprecated this snapshot class is no longer in use, and is maintained only for
+	 *             backwards compatibility purposes. It is fully replaced
+	 *             by {@link NullableSerializerSnapshot}.
 	 */
+	@Deprecated
 	@Internal
-	public static class NullableSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
+	public static class NullableSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<T> {
 		private static final int VERSION = 1;
 
-		/** This empty nullary constructor is required for deserializing the configuration. */
-		@SuppressWarnings("unused")
-		public NullableSerializerConfigSnapshot() {}
+		/**
+		 * This empty nullary constructor is required for deserializing the configuration.
+		 */
+		public NullableSerializerConfigSnapshot() {
+		}
 
 		NullableSerializerConfigSnapshot(TypeSerializer<T> originalSerializer) {
 			super(originalSerializer);
@@ -281,5 +279,88 @@ public class NullableSerializer<T> extends TypeSerializer<T> {
 		public int getVersion() {
 			return VERSION;
 		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
+			NullableSerializer<T> previousSerializer = (NullableSerializer<T>) restoreSerializer();
+			NullableSerializerSnapshot<T> newCompositeSnapshot = new NullableSerializerSnapshot<>(previousSerializer.nullPaddingLength());
+
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				newCompositeSnapshot,
+				getSingleNestedSerializerAndConfig().f1
+			);
+		}
 	}
+
+	/**
+	 * Snapshot for serializers of nullable types, containing the
+	 * snapshot of its original serializer.
+	 */
+	@SuppressWarnings({"unchecked", "WeakerAccess"})
+	public static class NullableSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<T, NullableSerializer<T>> {
+
+		private static final int VERSION = 2;
+		private int nullPaddingLength;
+
+		@SuppressWarnings("unused")
+		public NullableSerializerSnapshot() {
+			super(serializerClass());
+		}
+
+		public NullableSerializerSnapshot(NullableSerializer<T> serializerInstance) {
+			super(serializerInstance);
+			this.nullPaddingLength = serializerInstance.nullPaddingLength();
+		}
+
+		private NullableSerializerSnapshot(int nullPaddingLength) {
+			super(serializerClass());
+			checkArgument(nullPaddingLength >= 0,
+				"Computed NULL padding can not be negative. %d",
+				nullPaddingLength);
+
+			this.nullPaddingLength = nullPaddingLength;
+		}
+
+		@Override
+		protected int getCurrentOuterSnapshotVersion() {
+			return VERSION;
+		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(NullableSerializer<T> outerSerializer) {
+			return new TypeSerializer[]{outerSerializer.originalSerializer()};
+		}
+
+		@Override
+		protected NullableSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			checkState(nullPaddingLength >= 0,
+				"Negative padding size after serializer construction: %d",
+				nullPaddingLength);
+
+			final byte[] padding = (nullPaddingLength == 0) ? EMPTY_BYTE_ARRAY : new byte[nullPaddingLength];
+			TypeSerializer<T> nestedSerializer = (TypeSerializer<T>) nestedSerializers[0];
+			return new NullableSerializer<>(nestedSerializer, padding);
+		}
+
+		@Override
+		protected void writeOuterSnapshot(DataOutputView out) throws IOException {
+			out.writeInt(nullPaddingLength);
+		}
+
+		@Override
+		protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+			nullPaddingLength = in.readInt();
+		}
+
+		@Override
+		protected boolean isOuterSnapshotCompatible(NullableSerializer<T> newSerializer) {
+			return nullPaddingLength == newSerializer.nullPaddingLength();
+		}
+
+		private static <T> Class<NullableSerializer<T>> serializerClass() {
+			return (Class<NullableSerializer<T>>) (Class<?>) NullableSerializer.class;
+		}
+	}
+
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
index 57c939d..2e363fb 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
@@ -28,7 +28,6 @@ import org.apache.flink.util.TestLogger;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeMatcher;
-
 import org.junit.Test;
 
 import java.io.IOException;
@@ -89,9 +88,11 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		TypeSerializer<ElementT> serializer = snapshot.restoreSerializer();
 
 		DataInputView input = dataUnderTest();
+
+		final Matcher<ElementT> matcher = testSpecification.testDataElementMatcher;
 		for (int i = 0; i < testSpecification.testDataCount; i++) {
 			final ElementT result = serializer.deserialize(input);
-			assertThat(result, notNullValue());
+			assertThat(result, matcher);
 		}
 	}
 
@@ -193,6 +194,10 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 	// Test Specification
 	// --------------------------------------------------------------------------------------------------------------
 
+	/**
+	 * Test Specification.
+	 */
+	@SuppressWarnings("WeakerAccess")
 	protected static final class TestSpecification<T> {
 		private final Class<? extends TypeSerializer<T>> serializerType;
 		private final Class<? extends TypeSerializerSnapshot<T>> snapshotClass;
@@ -205,6 +210,9 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		private int testDataCount;
 
 		@SuppressWarnings("unchecked")
+		private Matcher<T> testDataElementMatcher = (Matcher<T>) notNullValue();
+
+		@SuppressWarnings("unchecked")
 		public static <T> TestSpecification<T> builder(
 			String name,
 			Class<? extends TypeSerializer> serializerClass,
@@ -253,6 +261,11 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 			return this;
 		}
 
+		public TestSpecification<T> withTestDataMatcher(Matcher<T> matcher) {
+			testDataElementMatcher = matcher;
+			return this;
+		}
+
 		private TypeSerializer<T> createSerializer() {
 			try {
 				return (serializerProvider == null) ? serializerType.newInstance() : serializerProvider.get();
@@ -274,10 +287,6 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 			return testMigrationVersion;
 		}
 
-		public Class<? extends TypeSerializerSnapshot<T>> getSnapshotClass() {
-			return snapshotClass;
-		}
-
 		@Override
 		public String toString() {
 			return String.format("%s , %s, %s", name, serializerType.getSimpleName(), snapshotClass.getSimpleName());
@@ -344,6 +353,45 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		/**
 		 * Adds a test specification to be tested for all specified test versions.
 		 *
+		 * <p>This method adds the specification with pre-defined snapshot and data filenames,
+		 * with the format "flink-&lt;testVersion&gt;-&lt;specName&gt;-&lt;data/snapshot&gt;",
+		 * and each specification's test data count is assumed to always be 10.
+		 *
+		 * @param name test specification name.
+		 * @param serializerClass class of the current serializer.
+		 * @param snapshotClass class of the current serializer snapshot class.
+		 * @param serializerProvider provider for an instance of the current serializer.
+		 * @param elementMatcher an {@code hamcrest} matcher to match test data.
+		 *
+		 * @param <T> type of the test data.
+		 */
+		public <T> void add(
+			String name,
+			Class<? extends TypeSerializer> serializerClass,
+			Class<? extends TypeSerializerSnapshot> snapshotClass,
+			Supplier<? extends TypeSerializer<T>> serializerProvider,
+			Matcher<T> elementMatcher)  {
+			for (MigrationVersion testVersion : testVersions) {
+				testSpecifications.add(
+					TestSpecification.<T>builder(
+						getSpecNameForVersion(name, testVersion),
+						serializerClass,
+						snapshotClass,
+						testVersion)
+						.withNewSerializerProvider(serializerProvider)
+						.withSnapshotDataLocation(
+							String.format(DEFAULT_SNAPSHOT_FILENAME_FORMAT, testVersion, name))
+						.withTestData(
+							String.format(DEFAULT_TEST_DATA_FILENAME_FORMAT, testVersion, name),
+							DEFAULT_TEST_DATA_COUNT)
+					.withTestDataMatcher(elementMatcher)
+				);
+			}
+		}
+
+		/**
+		 * Adds a test specification to be tested for all specified test versions.
+		 *
 		 * @param name test specification name.
 		 * @param serializerClass class of the current serializer.
 		 * @param snapshotClass class of the current serializer snapshot class.
@@ -385,6 +433,9 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		}
 	}
 
+	/**
+	 * Supplier of paths based on {@link MigrationVersion}.
+	 */
 	protected interface TestResourceFilenameSupplier {
 		String get(MigrationVersion testVersion);
 	}
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java
new file mode 100644
index 0000000..1da7da7
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.typeutils.runtime.NullableSerializer.NullableSerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * {@link NullableSerializer} migration test.
+ */
+@RunWith(Parameterized.class)
+public class NullableSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Long> {
+
+	public NullableSerializerMigrationTest(TestSpecification<Long> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"nullable-padded-serializer",
+			NullableSerializer.class,
+			NullableSerializerSnapshot.class,
+			() -> NullableSerializer.wrap(LongSerializer.INSTANCE, true),
+			NULL_OR_LONG);
+
+		testSpecifications.add(
+			"nullable-not-padded-serializer",
+			NullableSerializer.class,
+			NullableSerializerSnapshot.class,
+			() -> NullableSerializer.wrap(LongSerializer.INSTANCE, false),
+			NULL_OR_LONG);
+
+		return testSpecifications.get();
+	}
+
+	@SuppressWarnings("unchecked")
+	private static final Matcher<Long> NULL_OR_LONG = new NullableMatcher();
+
+	private static final class NullableMatcher extends BaseMatcher<Long> {
+
+		@Override
+		public boolean matches(Object item) {
+			return item == null || item instanceof Long;
+		}
+
+		@Override
+		public void describeTo(Description description) {
+			description.appendText("a null or a long");
+		}
+	}
+
+}
diff --git a/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data
new file mode 100644
index 0000000..ca6d29f
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot
new file mode 100644
index 0000000..8c8b27f
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data
new file mode 100644
index 0000000..42439f7
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot
new file mode 100644
index 0000000..9bd3af7
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data
new file mode 100644
index 0000000..aecf5b8
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot
new file mode 100644
index 0000000..a2e0807
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot differ
diff --git a/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data
new file mode 100644
index 0000000..426221e
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data differ
diff --git a/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot
new file mode 100644
index 0000000..685f8dd
Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot differ


[flink] 10/18: [FLINK-11329] [DataStream] Migrate StreamElementSerializer to use new compatibility API

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 17618f323a2fafcf66022e3a66e56a2d84ce5cb6
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Mon Jan 28 16:32:14 2019 +0100

    [FLINK-11329] [DataStream] Migrate StreamElementSerializer to use new compatibility API
---
 .../streamrecord/StreamElementSerializer.java      |  93 ++++++++++++---------
 .../StreamElementSerializerMigrationTest.java      |  55 ++++++++++++
 .../flink-1.6-stream-element-serializer-data       | Bin 0 -> 158 bytes
 .../flink-1.6-stream-element-serializer-snapshot   | Bin 0 -> 931 bytes
 .../flink-1.7-stream-element-serializer-data       | Bin 0 -> 158 bytes
 .../flink-1.7-stream-element-serializer-snapshot   | Bin 0 -> 932 bytes
 6 files changed, 109 insertions(+), 39 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index f84ff15a..e8af0f9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -19,15 +19,11 @@
 package org.apache.flink.streaming.runtime.streamrecord;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -283,56 +279,75 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public StreamElementSerializerConfigSnapshot<T> snapshotConfiguration() {
-		return new StreamElementSerializerConfigSnapshot<>(typeSerializer);
+	public StreamElementSerializerSnapshot<T> snapshotConfiguration() {
+		return new StreamElementSerializerSnapshot<>(this);
 	}
 
-	@Override
-	public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-		Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> previousTypeSerializerAndConfig;
+	/**
+	 * Configuration snapshot specific to the {@link StreamElementSerializer}.
+	 * @deprecated see {@link StreamElementSerializerSnapshot}.
+	 */
+	@Deprecated
+	public static final class StreamElementSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<StreamElement> {
 
-		// we are compatible for data written by ourselves or the legacy MultiplexingStreamRecordSerializer
-		if (configSnapshot instanceof StreamElementSerializerConfigSnapshot) {
-			previousTypeSerializerAndConfig =
-				((StreamElementSerializerConfigSnapshot<?>) configSnapshot).getSingleNestedSerializerAndConfig();
-		} else {
-			return CompatibilityResult.requiresMigration();
-		}
+		private static final int VERSION = 1;
 
-		CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
-				previousTypeSerializerAndConfig.f0,
-				UnloadableDummyTypeSerializer.class,
-				previousTypeSerializerAndConfig.f1,
-				typeSerializer);
+		/** This empty nullary constructor is required for deserializing the configuration. */
+		public StreamElementSerializerConfigSnapshot() {}
 
-		if (!compatResult.isRequiresMigration()) {
-			return CompatibilityResult.compatible();
-		} else if (compatResult.getConvertDeserializer() != null) {
-			return CompatibilityResult.requiresMigration(
-				new StreamElementSerializer<>(
-					new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
-		} else {
-			return CompatibilityResult.requiresMigration();
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<StreamElement> resolveSchemaCompatibility(TypeSerializer<StreamElement> newSerializer) {
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new StreamElementSerializerSnapshot<>(),
+				getSingleNestedSerializerAndConfig().f1);
 		}
 	}
 
 	/**
 	 * Configuration snapshot specific to the {@link StreamElementSerializer}.
 	 */
-	public static final class StreamElementSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<StreamElement> {
+	public static final class StreamElementSerializerSnapshot<T>
+		extends CompositeTypeSerializerSnapshot<StreamElement, StreamElementSerializer<T>> {
 
-		private static final int VERSION = 1;
+		private static final int VERSION = 2;
 
-		/** This empty nullary constructor is required for deserializing the configuration. */
-		public StreamElementSerializerConfigSnapshot() {}
+		@SuppressWarnings("WeakerAccess")
+		public StreamElementSerializerSnapshot() {
+			super(serializerClass());
+		}
 
-		public StreamElementSerializerConfigSnapshot(TypeSerializer<T> typeSerializer) {
-			super(typeSerializer);
+		StreamElementSerializerSnapshot(StreamElementSerializer<T> serializerInstance) {
+			super(serializerInstance);
 		}
 
 		@Override
-		public int getVersion() {
+		protected int getCurrentOuterSnapshotVersion() {
 			return VERSION;
 		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(StreamElementSerializer<T> outerSerializer) {
+			return new TypeSerializer[]{outerSerializer.getContainedTypeSerializer()};
+		}
+
+		@Override
+		protected StreamElementSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			@SuppressWarnings("unchecked")
+			TypeSerializer<T> casted = (TypeSerializer<T>) nestedSerializers[0];
+
+			return new StreamElementSerializer<>(casted);
+		}
+
+		@SuppressWarnings("unchecked")
+		private static <T> Class<StreamElementSerializer<T>> serializerClass() {
+			Class<?> streamElementSerializerClass = StreamElementSerializer.class;
+			return (Class<StreamElementSerializer<T>>) streamElementSerializerClass;
+		}
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java
new file mode 100644
index 0000000..b6169d4
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.StreamElementSerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * Migration tests for {@link StreamElementSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class StreamElementSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<StreamElement> {
+
+	public StreamElementSerializerMigrationTest(TestSpecification<StreamElement> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"stream-element-serializer",
+			StreamElementSerializer.class,
+			StreamElementSerializerSnapshot.class,
+			() -> new StreamElementSerializer<>(StringSerializer.INSTANCE));
+
+		return testSpecifications.get();
+	}
+}
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data
new file mode 100644
index 0000000..81b80c3
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot
new file mode 100644
index 0000000..8ffdb43
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data
new file mode 100644
index 0000000..01f05e7
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot
new file mode 100644
index 0000000..dc7f76b
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot differ


[flink] 13/18: [FLINK-11329] [DataStream] Migrate TwoPhaseCommitSinkFunction.StateSerializer to use new compatibility API

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 519210177e626b71a944a80ac06179933a95f1f4
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jan 30 17:12:15 2019 +0800

    [FLINK-11329] [DataStream] Migrate TwoPhaseCommitSinkFunction.StateSerializer to use new compatibility API
---
 .../functions/sink/TwoPhaseCommitSinkFunction.java | 112 +++++++++++++--------
 ...haseCommitSinkStateSerializerMigrationTest.java |  61 +++++++++++
 ...1.6-two-phase-commit-sink-state-serializer-data | Bin 0 -> 530 bytes
 ...two-phase-commit-sink-state-serializer-snapshot | Bin 0 -> 1430 bytes
 ...1.7-two-phase-commit-sink-state-serializer-data | Bin 0 -> 530 bytes
 ...two-phase-commit-sink-state-serializer-snapshot | Bin 0 -> 1438 bytes
 6 files changed, 132 insertions(+), 41 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index e393354..fc8e4c1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -24,15 +24,13 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -788,42 +786,8 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot<State<TXN, CONTEXT>> snapshotConfiguration() {
-			return new StateSerializerConfigSnapshot<>(transactionSerializer, contextSerializer);
-		}
-
-		@Override
-		public CompatibilityResult<State<TXN, CONTEXT>> ensureCompatibility(
-				TypeSerializerConfigSnapshot<?> configSnapshot) {
-			if (configSnapshot instanceof StateSerializerConfigSnapshot) {
-				List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousSerializersAndConfigs =
-						((StateSerializerConfigSnapshot<?, ?>) configSnapshot).getNestedSerializersAndConfigs();
-
-				CompatibilityResult<TXN> txnCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-						previousSerializersAndConfigs.get(0).f0,
-						UnloadableDummyTypeSerializer.class,
-						previousSerializersAndConfigs.get(0).f1,
-						transactionSerializer);
-
-				CompatibilityResult<CONTEXT> contextCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-						previousSerializersAndConfigs.get(1).f0,
-						UnloadableDummyTypeSerializer.class,
-						previousSerializersAndConfigs.get(1).f1,
-						contextSerializer);
-
-				if (!txnCompatResult.isRequiresMigration() && !contextCompatResult.isRequiresMigration()) {
-					return CompatibilityResult.compatible();
-				} else {
-					if (txnCompatResult.getConvertDeserializer() != null && contextCompatResult.getConvertDeserializer() != null) {
-						return CompatibilityResult.requiresMigration(
-								new StateSerializer<>(
-										new TypeDeserializerAdapter<>(txnCompatResult.getConvertDeserializer()),
-										new TypeDeserializerAdapter<>(contextCompatResult.getConvertDeserializer())));
-					}
-				}
-			}
-
-			return CompatibilityResult.requiresMigration();
+		public StateSerializerSnapshot<TXN, CONTEXT> snapshotConfiguration() {
+			return new StateSerializerSnapshot<>(this);
 		}
 	}
 
@@ -831,8 +795,13 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 	 * {@link TypeSerializerConfigSnapshot} for sink state. This has to be public so that
 	 * it can be deserialized/instantiated, should not be used anywhere outside
 	 * {@code TwoPhaseCommitSinkFunction}.
+	 *
+	 * @deprecated this snapshot class is no longer in use, and is maintained only
+	 *             for backwards compatibility purposes. It is fully replaced by
+	 *             {@link StateSerializerSnapshot}.
 	 */
 	@Internal
+	@Deprecated
 	public static final class StateSerializerConfigSnapshot<TXN, CONTEXT>
 			extends CompositeTypeSerializerConfigSnapshot<State<TXN, CONTEXT>> {
 
@@ -851,5 +820,66 @@ public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 		public int getVersion() {
 			return VERSION;
 		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<State<TXN, CONTEXT>> resolveSchemaCompatibility(
+				TypeSerializer<State<TXN, CONTEXT>> newSerializer) {
+
+			final TypeSerializerSnapshot<?>[] nestedSnapshots = getNestedSerializersAndConfigs()
+				.stream()
+				.map(t -> t.f1)
+				.toArray(TypeSerializerSnapshot[]::new);
+
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new StateSerializerSnapshot<>(),
+				nestedSnapshots
+			);
+		}
+	}
+
+	/**
+	 * Snapshot for the {@link StateSerializer}.
+	 */
+	@Internal
+	public static final class StateSerializerSnapshot<TXN, CONTEXT>
+			extends CompositeTypeSerializerSnapshot<State<TXN, CONTEXT>, StateSerializer<TXN, CONTEXT>> {
+
+		private static final int VERSION = 2;
+
+		@SuppressWarnings("WeakerAccess")
+		public StateSerializerSnapshot() {
+			super(correspondingSerializerClass());
+		}
+
+		StateSerializerSnapshot(StateSerializer<TXN, CONTEXT> serializerInstance) {
+			super(serializerInstance);
+		}
+
+		@Override
+		protected int getCurrentOuterSnapshotVersion() {
+			return VERSION;
+		}
+
+		@Override
+		protected StateSerializer<TXN, CONTEXT> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			@SuppressWarnings("unchecked")
+			final TypeSerializer<TXN> transactionSerializer = (TypeSerializer<TXN>) nestedSerializers[0];
+
+			@SuppressWarnings("unchecked")
+			final TypeSerializer<CONTEXT> contextSerializer = (TypeSerializer<CONTEXT>) nestedSerializers[1];
+
+			return new StateSerializer<>(transactionSerializer, contextSerializer);
+		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(StateSerializer<TXN, CONTEXT> outerSerializer) {
+			return new TypeSerializer<?>[] { outerSerializer.transactionSerializer, outerSerializer.contextSerializer };
+		}
+
+		@SuppressWarnings("unchecked")
+		private static <TXN, CONTEXT> Class<StateSerializer<TXN, CONTEXT>> correspondingSerializerClass() {
+			return (Class<StateSerializer<TXN, CONTEXT>>) (Class<?>) StateSerializer.class;
+		}
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkStateSerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkStateSerializerMigrationTest.java
new file mode 100644
index 0000000..f1e3dd8
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkStateSerializerMigrationTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * Migration test for {@link TwoPhaseCommitSinkFunction.StateSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class TwoPhaseCommitSinkStateSerializerMigrationTest
+		extends TypeSerializerSnapshotMigrationTestBase<TwoPhaseCommitSinkFunction.State<Integer, String>> {
+
+	public TwoPhaseCommitSinkStateSerializerMigrationTest(
+			TestSpecification<TwoPhaseCommitSinkFunction.State<Integer, String>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"two-phase-commit-sink-state-serializer",
+			TwoPhaseCommitSinkFunction.StateSerializer.class,
+			TwoPhaseCommitSinkFunction.StateSerializerSnapshot.class,
+			TwoPhaseCommitSinkStateSerializerMigrationTest::intStringStateSerializerSupplier);
+
+		return testSpecifications.get();
+	}
+
+	private static TypeSerializer<TwoPhaseCommitSinkFunction.State<Integer, String>> intStringStateSerializerSupplier() {
+		return new TwoPhaseCommitSinkFunction.StateSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE);
+	}
+}
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-two-phase-commit-sink-state-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-two-phase-commit-sink-state-serializer-data
new file mode 100644
index 0000000..6d94356
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-two-phase-commit-sink-state-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-two-phase-commit-sink-state-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-two-phase-commit-sink-state-serializer-snapshot
new file mode 100644
index 0000000..e44e6be
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-two-phase-commit-sink-state-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-two-phase-commit-sink-state-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-two-phase-commit-sink-state-serializer-data
new file mode 100644
index 0000000..cd02200
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-two-phase-commit-sink-state-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-two-phase-commit-sink-state-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-two-phase-commit-sink-state-serializer-snapshot
new file mode 100644
index 0000000..6777a8e
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-two-phase-commit-sink-state-serializer-snapshot differ


[flink] 12/18: [FLINK-11329] [scala] Delegate compatibility check on old OptionSerializerConfigSnapshot to ScalaOptionSerializerSnapshot

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 198ceb27751f95ee7f6f870eb4f700c3b97ff746
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jan 30 15:59:45 2019 +0800

    [FLINK-11329] [scala] Delegate compatibility check on old OptionSerializerConfigSnapshot to ScalaOptionSerializerSnapshot
    
    This closes #7590.
---
 .../scala/typeutils/ScalaOptionSerializerConfigSnapshot.java   |  3 +++
 .../api/scala/typeutils/ScalaOptionSerializerSnapshot.java     |  3 ++-
 .../apache/flink/api/scala/typeutils/OptionSerializer.scala    | 10 ++++++++++
 3 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
index e6bc88c..3e5bd95 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
@@ -32,6 +32,9 @@ import scala.Option;
  * <p>This configuration snapshot class is implemented in Java because Scala does not
  * allow calling different base class constructors from subclasses, while we need that
  * for the default empty constructor.
+ *
+ * @deprecated this snapshot class is no longer in use, and is maintained only for backwards compatibility.
+ *             It is fully replaced by {@link ScalaOptionSerializerSnapshot}.
  */
 @Deprecated
 public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot<Option<E>> {
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
index dfa9178..47a72a2 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java
@@ -20,11 +20,12 @@ package org.apache.flink.api.scala.typeutils;
 
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 
 import scala.Option;
 
 /**
- * A {@link org.apache.flink.api.common.typeutils.TypeSerializerSnapshot} for the Scala {@link OptionSerializer}.
+ * A {@link TypeSerializerSnapshot} for the Scala {@link OptionSerializer}.
  */
 public final class ScalaOptionSerializerSnapshot<E> extends CompositeTypeSerializerSnapshot<Option<E>, OptionSerializer<E>> {
 
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index ea8f22a..d41d2e8 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -116,6 +116,16 @@ object OptionSerializer {
       extends CompositeTypeSerializerConfigSnapshot[Option[A]] {
 
     override def getVersion: Int = OptionSerializerConfigSnapshot.VERSION
+
+    override def resolveSchemaCompatibility(
+        newSerializer: TypeSerializer[Option[A]]
+    ): TypeSerializerSchemaCompatibility[Option[A]] = {
+      CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+        newSerializer,
+        new ScalaOptionSerializerSnapshot[A](),
+        getSingleNestedSerializerAndConfig.f1
+      )
+    }
   }
 
   object OptionSerializerConfigSnapshot {


[flink] 02/18: [hotfix][cep] Split NFAStateSerializer methods into smaller ones.

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8a8b8d80bab037bade0239547b6385d009a95168
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jan 23 10:39:03 2019 +0100

    [hotfix][cep] Split NFAStateSerializer methods into smaller ones.
---
 .../apache/flink/cep/nfa/NFAStateSerializer.java   | 163 ++++++++++++---------
 1 file changed, 97 insertions(+), 66 deletions(-)

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
index 7b83030..7d79a36 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
@@ -21,8 +21,6 @@ package org.apache.flink.cep.nfa;
 import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
@@ -30,6 +28,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.types.StringValue;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -44,11 +43,12 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 
 	private static final long serialVersionUID = 2098282423980597010L;
 
-	private NFAStateSerializer() {
-	}
-
 	public static final NFAStateSerializer INSTANCE = new NFAStateSerializer();
 
+	private static final DeweyNumber.DeweyNumberSerializer VERSION_SERIALIZER = DeweyNumber.DeweyNumberSerializer.INSTANCE;
+	private static final NodeId.NodeIdSerializer NODE_ID_SERIALIZER = new NodeId.NodeIdSerializer();
+	private static final EventId.EventIdSerializer EVENT_ID_SERIALIZER = EventId.EventIdSerializer.INSTANCE;
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
@@ -92,35 +92,12 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 		return -1;
 	}
 
-	private static final StringSerializer STATE_NAME_SERIALIZER = StringSerializer.INSTANCE;
-	private static final LongSerializer TIMESTAMP_SERIALIZER = LongSerializer.INSTANCE;
-	private static final DeweyNumber.DeweyNumberSerializer VERSION_SERIALIZER = DeweyNumber.DeweyNumberSerializer.INSTANCE;
-	private static final NodeId.NodeIdSerializer NODE_ID_SERIALIZER = new NodeId.NodeIdSerializer();
-	private static final EventId.EventIdSerializer EVENT_ID_SERIALIZER = EventId.EventIdSerializer.INSTANCE;
-
 	@Override
 	public void serialize(NFAState record, DataOutputView target) throws IOException {
 		serializeComputationStates(record.getPartialMatches(), target);
 		serializeComputationStates(record.getCompletedMatches(), target);
 	}
 
-	private void serializeComputationStates(Queue<ComputationState> states, DataOutputView target) throws IOException {
-		target.writeInt(states.size());
-		for (ComputationState computationState : states) {
-			STATE_NAME_SERIALIZER.serialize(computationState.getCurrentStateName(), target);
-			NODE_ID_SERIALIZER.serialize(computationState.getPreviousBufferEntry(), target);
-
-			VERSION_SERIALIZER.serialize(computationState.getVersion(), target);
-			TIMESTAMP_SERIALIZER.serialize(computationState.getStartTimestamp(), target);
-			if (computationState.getStartEventID() != null) {
-				target.writeByte(1);
-				EVENT_ID_SERIALIZER.serialize(computationState.getStartEventID(), target);
-			} else {
-				target.writeByte(0);
-			}
-		}
-	}
-
 	@Override
 	public NFAState deserialize(DataInputView source) throws IOException {
 		PriorityQueue<ComputationState> partialMatches = deserializeComputationStates(source);
@@ -128,27 +105,6 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 		return new NFAState(partialMatches, completedMatches);
 	}
 
-	private PriorityQueue<ComputationState> deserializeComputationStates(DataInputView source) throws IOException {
-		PriorityQueue<ComputationState> computationStates = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
-
-		int computationStateNo = source.readInt();
-		for (int i = 0; i < computationStateNo; i++) {
-			String state = STATE_NAME_SERIALIZER.deserialize(source);
-			NodeId prevState = NODE_ID_SERIALIZER.deserialize(source);
-			DeweyNumber version = VERSION_SERIALIZER.deserialize(source);
-			long startTimestamp = TIMESTAMP_SERIALIZER.deserialize(source);
-
-			byte isNull = source.readByte();
-			EventId startEventId = null;
-			if (isNull == 1) {
-				startEventId = EVENT_ID_SERIALIZER.deserialize(source);
-			}
-
-			computationStates.add(ComputationState.createState(state, prevState, version, startTimestamp, startEventId));
-		}
-		return computationStates;
-	}
-
 	@Override
 	public NFAState deserialize(NFAState reuse, DataInputView source) throws IOException {
 		return deserialize(source);
@@ -165,22 +121,7 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 		target.writeInt(computationStateNo);
 
 		for (int i = 0; i < computationStateNo; i++) {
-			String state = STATE_NAME_SERIALIZER.deserialize(source);
-			STATE_NAME_SERIALIZER.serialize(state, target);
-			NodeId prevState = NODE_ID_SERIALIZER.deserialize(source);
-			NODE_ID_SERIALIZER.serialize(prevState, target);
-			DeweyNumber version = VERSION_SERIALIZER.deserialize(source);
-			VERSION_SERIALIZER.serialize(version, target);
-			long startTimestamp = TIMESTAMP_SERIALIZER.deserialize(source);
-			TIMESTAMP_SERIALIZER.serialize(startTimestamp, target);
-
-			byte isNull = source.readByte();
-			target.writeByte(isNull);
-
-			if (isNull == 1) {
-				EventId startEventId = EVENT_ID_SERIALIZER.deserialize(source);
-				EVENT_ID_SERIALIZER.serialize(startEventId, target);
-			}
+			copySingleComputationState(source, target);
 		}
 	}
 
@@ -196,14 +137,104 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 		return new NFAStateSerializerSnapshot();
 	}
 
+	private NFAStateSerializer() {
+	}
+
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
 	@SuppressWarnings("WeakerAccess")
 	public static final class NFAStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<NFAState> {
-
 		public NFAStateSerializerSnapshot() {
 			super(() -> INSTANCE);
 		}
 	}
+
+	/*
+		De/serialization methods
+	 */
+
+	private void serializeComputationStates(Queue<ComputationState> states, DataOutputView target) throws IOException {
+		target.writeInt(states.size());
+		for (ComputationState computationState : states) {
+			serializeSingleComputationState(computationState, target);
+		}
+	}
+
+	private PriorityQueue<ComputationState> deserializeComputationStates(DataInputView source) throws IOException {
+		PriorityQueue<ComputationState> computationStates = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
+
+		int computationStateNo = source.readInt();
+		for (int i = 0; i < computationStateNo; i++) {
+			final ComputationState computationState = deserializeSingleComputationState(source);
+			computationStates.add(computationState);
+		}
+		return computationStates;
+	}
+
+	private void serializeSingleComputationState(
+			ComputationState computationState,
+			DataOutputView target) throws IOException {
+
+		StringValue.writeString(computationState.getCurrentStateName(), target);
+		NODE_ID_SERIALIZER.serialize(computationState.getPreviousBufferEntry(), target);
+		VERSION_SERIALIZER.serialize(computationState.getVersion(), target);
+		target.writeLong(computationState.getStartTimestamp());
+		serializeStartEvent(computationState.getStartEventID(), target);
+	}
+
+	private ComputationState deserializeSingleComputationState(DataInputView source) throws IOException {
+		String stateName = StringValue.readString(source);
+		NodeId prevState = NODE_ID_SERIALIZER.deserialize(source);
+		DeweyNumber version = VERSION_SERIALIZER.deserialize(source);
+		long startTimestamp = source.readLong();
+
+		EventId startEventId = deserializeStartEvent(source);
+
+		return ComputationState.createState(stateName,
+			prevState,
+			version,
+			startTimestamp,
+			startEventId);
+	}
+
+	private void copySingleComputationState(DataInputView source, DataOutputView target) throws IOException {
+		StringValue.copyString(source, target);
+		NodeId prevState = NODE_ID_SERIALIZER.deserialize(source);
+		NODE_ID_SERIALIZER.serialize(prevState, target);
+		DeweyNumber version = VERSION_SERIALIZER.deserialize(source);
+		VERSION_SERIALIZER.serialize(version, target);
+		long startTimestamp = source.readLong();
+		target.writeLong(startTimestamp);
+
+		copyStartEvent(source, target);
+	}
+
+	private void serializeStartEvent(EventId startEventID, DataOutputView target) throws IOException {
+		if (startEventID != null) {
+			target.writeByte(1);
+			EVENT_ID_SERIALIZER.serialize(startEventID, target);
+		} else {
+			target.writeByte(0);
+		}
+	}
+
+	private EventId deserializeStartEvent(DataInputView source) throws IOException {
+		byte isNull = source.readByte();
+		EventId startEventId = null;
+		if (isNull == 1) {
+			startEventId = EVENT_ID_SERIALIZER.deserialize(source);
+		}
+		return startEventId;
+	}
+
+	private void copyStartEvent(DataInputView source, DataOutputView target) throws IOException {
+		byte isNull = source.readByte();
+		target.writeByte(isNull);
+
+		if (isNull == 1) {
+			EventId startEventId = EVENT_ID_SERIALIZER.deserialize(source);
+			EVENT_ID_SERIALIZER.serialize(startEventId, target);
+		}
+	}
 }


[flink] 14/18: [FLINK-11329] [DataStream] Migrate TimerSerializer to use new compatibility API

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7abaff3516b4afd34e15c6c238fd1ebaae253325
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Jan 30 17:35:45 2019 +0800

    [FLINK-11329] [DataStream] Migrate TimerSerializer to use new compatibility API
---
 .../streaming/api/operators/TimerSerializer.java   |  67 ++++++++-------------
 .../api/operators/TimerSerializerSnapshot.java     |  66 ++++++++++++++++++++
 .../TimerSerializerSnapshotMigrationTest.java      |  61 +++++++++++++++++++
 .../test/resources/flink-1.6-timer-serializer-data | Bin 0 -> 240 bytes
 .../resources/flink-1.6-timer-serializer-snapshot  | Bin 0 -> 1406 bytes
 .../test/resources/flink-1.7-timer-serializer-data | Bin 0 -> 240 bytes
 .../resources/flink-1.7-timer-serializer-snapshot  | Bin 0 -> 1414 bytes
 7 files changed, 152 insertions(+), 42 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
index b641c09..fb8e0a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
@@ -18,14 +18,11 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.MathUtils;
@@ -33,7 +30,6 @@ import org.apache.flink.util.MathUtils;
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Objects;
 
 /**
@@ -209,42 +205,8 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
 	}
 
 	@Override
-	public TypeSerializerConfigSnapshot<TimerHeapInternalTimer<K, N>> snapshotConfiguration() {
-		return new TimerSerializerConfigSnapshot<>(keySerializer, namespaceSerializer);
-	}
-
-	@Override
-	public CompatibilityResult<TimerHeapInternalTimer<K, N>> ensureCompatibility(
-		TypeSerializerConfigSnapshot configSnapshot) {
-
-		if (configSnapshot instanceof TimerSerializerConfigSnapshot) {
-			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousSerializersAndConfigs =
-				((TimerSerializerConfigSnapshot<?, ?>) configSnapshot).getNestedSerializersAndConfigs();
-
-			if (previousSerializersAndConfigs.size() == 2) {
-				Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> keySerializerAndSnapshot =
-					previousSerializersAndConfigs.get(KEY_SERIALIZER_SNAPSHOT_INDEX);
-				Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> namespaceSerializerAndSnapshot =
-					previousSerializersAndConfigs.get(NAMESPACE_SERIALIZER_SNAPSHOT_INDEX);
-				CompatibilityResult<K> keyCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
-					keySerializerAndSnapshot.f0,
-					UnloadableDummyTypeSerializer.class,
-					keySerializerAndSnapshot.f1,
-					keySerializer);
-
-				CompatibilityResult<N> namespaceCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
-					namespaceSerializerAndSnapshot.f0,
-					UnloadableDummyTypeSerializer.class,
-					namespaceSerializerAndSnapshot.f1,
-					namespaceSerializer);
-
-				if (!keyCompatibilityResult.isRequiresMigration()
-					&& !namespaceCompatibilityResult.isRequiresMigration()) {
-					return CompatibilityResult.compatible();
-				}
-			}
-		}
-		return CompatibilityResult.requiresMigration();
+	public TimerSerializerSnapshot<K, N> snapshotConfiguration() {
+		return new TimerSerializerSnapshot<>(this);
 	}
 
 	@Nonnull
@@ -262,7 +224,12 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
 	 *
 	 * @param <K> type of key.
 	 * @param <N> type of namespace.
+	 *
+	 * @deprecated this snapshot class is no longer in use, and is maintained only
+	 *             for backwards compatibility purposes. It is fully replaced by
+	 *             {@link TimerSerializerSnapshot}.
 	 */
+	@Deprecated
 	public static class TimerSerializerConfigSnapshot<K, N> extends CompositeTypeSerializerConfigSnapshot<TimerHeapInternalTimer<K, N>> {
 
 		private static final int VERSION = 1;
@@ -289,5 +256,21 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
 		public int getVersion() {
 			return VERSION;
 		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<TimerHeapInternalTimer<K, N>> resolveSchemaCompatibility(
+				TypeSerializer<TimerHeapInternalTimer<K, N>> newSerializer) {
+
+			final TypeSerializerSnapshot<?>[] nestedSnapshots = getNestedSerializersAndConfigs()
+				.stream()
+				.map(t -> t.f1)
+				.toArray(TypeSerializerSnapshot[]::new);
+
+			return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+				newSerializer,
+				new TimerSerializerSnapshot<>(),
+				nestedSnapshots
+			);
+		}
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java
new file mode 100644
index 0000000..c28b045
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshot.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * Snapshot class for the {@link TimerSerializer}.
+ */
+@Internal
+public class TimerSerializerSnapshot<K, N> extends CompositeTypeSerializerSnapshot<TimerHeapInternalTimer<K, N>, TimerSerializer<K, N>> {
+
+	private static final int VERSION = 2;
+
+	public TimerSerializerSnapshot() {
+		super(correspondingSerializerClass());
+	}
+
+	public TimerSerializerSnapshot(TimerSerializer<K, N> timerSerializer) {
+		super(timerSerializer);
+	}
+
+	@Override
+	protected int getCurrentOuterSnapshotVersion() {
+		return VERSION;
+	}
+
+	@Override
+	protected TimerSerializer<K, N> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		final TypeSerializer<K> keySerializer = (TypeSerializer<K>) nestedSerializers[0];
+
+		@SuppressWarnings("unchecked")
+		final TypeSerializer<N> namespaceSerializer = (TypeSerializer<N>) nestedSerializers[1];
+
+		return new TimerSerializer<K, N>(keySerializer, namespaceSerializer);
+	}
+
+	@Override
+	protected TypeSerializer<?>[] getNestedSerializers(TimerSerializer<K, N> outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getKeySerializer(), outerSerializer.getNamespaceSerializer() };
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <K, N> Class<TimerSerializer<K, N>> correspondingSerializerClass() {
+		return (Class<TimerSerializer<K, N>>) (Class<?>) TimerSerializer.class;
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshotMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshotMigrationTest.java
new file mode 100644
index 0000000..6b07a09
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimerSerializerSnapshotMigrationTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * Migration test for {@link TimerSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class TimerSerializerSnapshotMigrationTest
+	extends TypeSerializerSnapshotMigrationTestBase<TimerHeapInternalTimer<String, Integer>> {
+
+	public TimerSerializerSnapshotMigrationTest(
+		TestSpecification<TimerHeapInternalTimer<String, Integer>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"timer-serializer",
+			TimerSerializer.class,
+			TimerSerializerSnapshot.class,
+			TimerSerializerSnapshotMigrationTest::stringIntTimerSerializerSupplier);
+
+		return testSpecifications.get();
+	}
+
+	private static TypeSerializer<TimerHeapInternalTimer<String, Integer>> stringIntTimerSerializerSupplier() {
+		return new TimerSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE);
+	}
+}
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-timer-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-timer-serializer-data
new file mode 100644
index 0000000..02f84d9
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-timer-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-timer-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-timer-serializer-snapshot
new file mode 100644
index 0000000..fee77f2
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-timer-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-timer-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-timer-serializer-data
new file mode 100644
index 0000000..9069def
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-timer-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-timer-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-timer-serializer-snapshot
new file mode 100644
index 0000000..d5f3fda
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-timer-serializer-snapshot differ


[flink] 04/18: [FLINK-11329] [DataStream] Migrating CompositeSerializer to use new compatibility API

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e49c3e0eb8e183c9a57cd532c571cdc971575b06
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Sun Jan 27 14:50:36 2019 +0100

    [FLINK-11329] [DataStream] Migrating CompositeSerializer to use new compatibility API
---
 .../api/common/typeutils/CompositeSerializer.java  |  16 +++--
 .../common/typeutils/CompositeSerializerTest.java  |   5 ++
 .../streaming/tests/TtlVerifyUpdateFunction.java   |   6 +-
 .../flink/streaming/tests/verify/ValueWithTs.java  |  60 +++++++++++++++--
 .../flink/runtime/state/ttl/TtlStateFactory.java   |  74 ++++++++++++++++++---
 .../state/ttl/TtlSerializerStateMigrationTest.java |  60 +++++++++++++++++
 .../test/resources/flink-1.6-ttl-serializer-data   | Bin 0 -> 230 bytes
 .../resources/flink-1.6-ttl-serializer-snapshot    | Bin 0 -> 1753 bytes
 .../test/resources/flink-1.7-ttl-serializer-data   | Bin 0 -> 230 bytes
 .../resources/flink-1.7-ttl-serializer-snapshot    | Bin 0 -> 1763 bytes
 10 files changed, 202 insertions(+), 19 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
index 2db7a30..8bc26a5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
@@ -205,12 +205,9 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public TypeSerializerConfigSnapshot snapshotConfiguration() {
-		return new ConfigSnapshot(fieldSerializers);
-	}
-
-	@Override
 	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		// We can not remove this method, as long as we support restoring into CompositeTypeSerializerConfigSnapshot.
+		// Previously (pre 1.8), multiple composite serializers were using this class directly as their snapshot class.
 		if (configSnapshot instanceof ConfigSnapshot) {
 			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
 				((CompositeTypeSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
@@ -301,7 +298,14 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
 		}
 	}
 
-	/** Snapshot field serializers of composite type. */
+	/**
+	 * Snapshot field serializers of composite type.
+	 *
+	 * @deprecated this snapshot class is no longer in use by any serializers, and is only
+	 *             kept around for backwards compatibility. All subclass serializers should
+	 *             have their own serializer snapshot classes.
+	 */
+	@Deprecated
 	public static class ConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
 		private static final int VERSION = 0;
 
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java
index fc5c241..719ea49 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java
@@ -191,6 +191,11 @@ public class CompositeSerializerTest {
 			PrecomputedParameters precomputed, TypeSerializer<?>... originalSerializers) {
 			return new TestListCompositeSerializer(precomputed, originalSerializers);
 		}
+
+		@Override
+		public TypeSerializerSnapshot<List<Object>> snapshotConfiguration() {
+			throw new UnsupportedOperationException();
+		}
 	}
 
 	private static class CompositeSerializerTestInstance extends SerializerTestInstance<List<Object>> {
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
index f9e492e..18f6e35 100644
--- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -143,7 +144,10 @@ class TtlVerifyUpdateFunction extends RichFlatMapFunction<TtlStateUpdate, String
 		prevUpdatesByVerifierId = TtlStateVerifier.VERIFIERS.stream()
 			.collect(Collectors.toMap(TtlStateVerifier::getId, v -> {
 				checkNotNull(v);
-				TypeSerializer<ValueWithTs<?>> typeSerializer = new ValueWithTs.Serializer(v.getUpdateSerializer());
+				final TypeSerializer<ValueWithTs<?>> typeSerializer = new ValueWithTs.Serializer(
+					v.getUpdateSerializer(),
+					LongSerializer.INSTANCE);
+
 				ListStateDescriptor<ValueWithTs<?>> stateDesc = new ListStateDescriptor<>(
 					"TtlPrevValueState_" + v.getId(), typeSerializer);
 				KeyedStateStore store = context.getKeyedStateStore();
diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
index a4f3080..7070c6d 100644
--- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
+++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java
@@ -19,8 +19,9 @@
 package org.apache.flink.streaming.tests.verify;
 
 import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import javax.annotation.Nonnull;
@@ -56,8 +57,8 @@ public class ValueWithTs<V> implements Serializable {
 	/** Serializer for Serializer. */
 	public static class Serializer extends CompositeSerializer<ValueWithTs<?>> {
 
-		public Serializer(TypeSerializer<?> userValueSerializer) {
-			super(true, userValueSerializer, LongSerializer.INSTANCE);
+		public Serializer(TypeSerializer<?> valueSerializer, TypeSerializer<Long> timestampSerializer) {
+			super(true, valueSerializer, timestampSerializer);
 		}
 
 		@SuppressWarnings("unchecked")
@@ -92,7 +93,58 @@ public class ValueWithTs<V> implements Serializable {
 		protected CompositeSerializer<ValueWithTs<?>> createSerializerInstance(
 				PrecomputedParameters precomputed,
 				TypeSerializer<?>... originalSerializers) {
-			return new Serializer(precomputed, (TypeSerializer<Object>) originalSerializers[0]);
+
+			return new Serializer(precomputed, originalSerializers[0], originalSerializers[1]);
+		}
+
+		TypeSerializer<?> getValueSerializer() {
+			return fieldSerializers[0];
+		}
+
+		@SuppressWarnings("unchecked")
+		TypeSerializer<Long> getTimestampSerializer() {
+			TypeSerializer<?> fieldSerializer = fieldSerializers[1];
+			return (TypeSerializer<Long>) fieldSerializer;
+		}
+
+		@Override
+		public TypeSerializerSnapshot<ValueWithTs<?>> snapshotConfiguration() {
+			return new ValueWithTsSerializerSnapshot(this);
+		}
+	}
+
+	/**
+	 * A {@link TypeSerializerSnapshot} for ValueWithTs Serializer.
+	 */
+	public static final class ValueWithTsSerializerSnapshot extends CompositeTypeSerializerSnapshot<ValueWithTs<?>, Serializer> {
+
+		private static final int VERSION = 2;
+
+		@SuppressWarnings("unused")
+		public ValueWithTsSerializerSnapshot() {
+			super(Serializer.class);
+		}
+
+		ValueWithTsSerializerSnapshot(Serializer serializerInstance) {
+			super(serializerInstance);
+		}
+
+		@Override
+		protected int getCurrentOuterSnapshotVersion() {
+			return VERSION;
+		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(Serializer outerSerializer) {
+			return new TypeSerializer[]{outerSerializer.getValueSerializer(), outerSerializer.getTimestampSerializer()};
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		protected Serializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			TypeSerializer<?> valueSerializer = nestedSerializers[0];
+			TypeSerializer<Long> timeSerializer = (TypeSerializer<Long>) nestedSerializers[1];
+			return new Serializer(valueSerializer, timeSerializer);
 		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 453f43a..6f05e1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -28,7 +28,9 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.KeyedStateBackend;
@@ -126,7 +128,7 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> {
 	@SuppressWarnings("unchecked")
 	private IS createValueState() throws Exception {
 		ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
-			stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()));
+			stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
 		return (IS) new TtlValueState<>(createTtlStateContext(ttlDescriptor));
 	}
 
@@ -134,7 +136,7 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> {
 	private <T> IS createListState() throws Exception {
 		ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
 		ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
-			stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
+			stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, listStateDesc.getElementSerializer()));
 		return (IS) new TtlListState<>(createTtlStateContext(ttlDescriptor));
 	}
 
@@ -144,7 +146,7 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> {
 		MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
 			stateDesc.getName(),
 			mapStateDesc.getKeySerializer(),
-			new TtlSerializer<>(mapStateDesc.getValueSerializer()));
+			new TtlSerializer<>(LongSerializer.INSTANCE, mapStateDesc.getValueSerializer()));
 		return (IS) new TtlMapState<>(createTtlStateContext(ttlDescriptor));
 	}
 
@@ -154,7 +156,7 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> {
 		ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
 			stateDesc.getName(),
 			new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
-			new TtlSerializer<>(stateDesc.getSerializer()));
+			new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
 		return (IS) new TtlReducingState<>(createTtlStateContext(ttlDescriptor));
 	}
 
@@ -165,7 +167,7 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> {
 		TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
 			aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
 		AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
-			stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
+			stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
 		return (IS) new TtlAggregatingState<>(createTtlStateContext(ttlDescriptor), ttlAggregateFunction);
 	}
 
@@ -178,7 +180,7 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> {
 			stateDesc.getName(),
 			ttlInitAcc,
 			new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc),
-			new TtlSerializer<>(stateDesc.getSerializer()));
+			new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
 		return (IS) new TtlFoldingState<>(createTtlStateContext(ttlDescriptor));
 	}
 
@@ -237,8 +239,8 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> {
 		private static final long serialVersionUID = 131020282727167064L;
 
 		@SuppressWarnings("WeakerAccess")
-		public TtlSerializer(TypeSerializer<T> userValueSerializer) {
-			super(true, LongSerializer.INSTANCE, userValueSerializer);
+		public TtlSerializer(TypeSerializer<Long> timestampSerializer, TypeSerializer<T> userValueSerializer) {
+			super(true, timestampSerializer, userValueSerializer);
 		}
 
 		@SuppressWarnings("WeakerAccess")
@@ -272,5 +274,61 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> {
 			Preconditions.checkArgument(originalSerializers.length == 2);
 			return new TtlSerializer<>(precomputed, originalSerializers);
 		}
+
+		@SuppressWarnings("unchecked")
+		TypeSerializer<Long> getTimestampSerializer() {
+			return (TypeSerializer<Long>) (TypeSerializer<?>) fieldSerializers[0];
+		}
+
+		@SuppressWarnings("unchecked")
+		TypeSerializer<T> getValueSerializer() {
+			return (TypeSerializer<T>) fieldSerializers[1];
+		}
+
+		@Override
+		public TypeSerializerSnapshot<TtlValue<T>> snapshotConfiguration() {
+			return new TtlSerializerSnapshot<>(this);
+		}
+	}
+
+	/**
+	 * A {@link TypeSerializerSnapshot} for TtlSerializer.
+	 */
+	public static final class TtlSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<TtlValue<T>, TtlSerializer<T>> {
+
+		private static final int VERSION = 2;
+
+		@SuppressWarnings({"WeakerAccess", "unused"})
+		public TtlSerializerSnapshot() {
+			super(correspondingSerializerClass());
+		}
+
+		TtlSerializerSnapshot(TtlSerializer<T> serializerInstance) {
+			super(serializerInstance);
+		}
+
+		@Override
+		protected int getCurrentOuterSnapshotVersion() {
+			return VERSION;
+		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(TtlSerializer<T> outerSerializer) {
+			return new TypeSerializer[]{ outerSerializer.getTimestampSerializer(), outerSerializer.getValueSerializer()};
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		protected TtlSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			TypeSerializer<Long> timestampSerializer = (TypeSerializer<Long>) nestedSerializers[0];
+			TypeSerializer<T> valueSerializer = (TypeSerializer<T>) nestedSerializers[1];
+
+			return new TtlSerializer<>(timestampSerializer, valueSerializer);
+		}
+
+		@SuppressWarnings("unchecked")
+		private static <T> Class<TtlSerializer<T>> correspondingSerializerClass() {
+			return (Class<TtlSerializer<T>>) (Class<?>) TtlSerializer.class;
+		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlSerializerStateMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlSerializerStateMigrationTest.java
new file mode 100644
index 0000000..87ab396
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlSerializerStateMigrationTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory.TtlSerializer;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory.TtlSerializerSnapshot;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * State migration test for {@link TtlSerializerStateMigrationTest}.
+ */
+@RunWith(Parameterized.class)
+public class TtlSerializerStateMigrationTest extends TypeSerializerSnapshotMigrationTestBase<TtlValue<String>> {
+
+	private static final String SPEC_NAME = "ttl-serializer";
+
+	public TtlSerializerStateMigrationTest(TestSpecification<TtlValue<String>> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			SPEC_NAME,
+			TtlSerializer.class,
+			TtlSerializerSnapshot.class,
+			() -> new TtlSerializer<>(LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+
+		return testSpecifications.get();
+	}
+}
+
diff --git a/flink-runtime/src/test/resources/flink-1.6-ttl-serializer-data b/flink-runtime/src/test/resources/flink-1.6-ttl-serializer-data
new file mode 100644
index 0000000..9d156ca
Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.6-ttl-serializer-data differ
diff --git a/flink-runtime/src/test/resources/flink-1.6-ttl-serializer-snapshot b/flink-runtime/src/test/resources/flink-1.6-ttl-serializer-snapshot
new file mode 100644
index 0000000..0d3a8a0
Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.6-ttl-serializer-snapshot differ
diff --git a/flink-runtime/src/test/resources/flink-1.7-ttl-serializer-data b/flink-runtime/src/test/resources/flink-1.7-ttl-serializer-data
new file mode 100644
index 0000000..16c2bda
Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.7-ttl-serializer-data differ
diff --git a/flink-runtime/src/test/resources/flink-1.7-ttl-serializer-snapshot b/flink-runtime/src/test/resources/flink-1.7-ttl-serializer-snapshot
new file mode 100644
index 0000000..74aca45
Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.7-ttl-serializer-snapshot differ