You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/23 09:22:45 UTC

[flink] branch master updated (f388b65 -> c73f2b5)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from f388b65  [hotfix][docs] Fix broken links
     new edf6d59  [FLINK-11328] [core] Upgrade parameterless / singleton serializers to use new serialization compatibility APIs
     new ade21d1  [FLINK-11328] [cep] Snapshots of NFA-related serializers should be a CompositeTypeSerializerSnapshot
     new ef47848  [hotfix] [tests] Remove redundant outdated migration tests in StateBackendTestBase
     new eb1241e  [FLINK-11328] [e2e] Do not use deprecated ParameterlessTypeSerializerConfig in e2e tests
     new c73f2b5  [FLINK-11328] [tests] Add serializer migration tests for all parameterless serializers

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connectors/kafka/FlinkKafkaProducer011.java    |  36 ++
 .../KafkaSerializerSnapshotsMigrationTest.java     |  59 +++
 .../flink-1.6-context-state-serializer-data        | Bin 0 -> 270 bytes
 .../flink-1.6-context-state-serializer-snapshot    | Bin 0 -> 452 bytes
 .../flink-1.6-transaction-state-serializer-data    | Bin 0 -> 110 bytes
 ...flink-1.6-transaction-state-serializer-snapshot | Bin 0 -> 460 bytes
 .../flink-1.7-context-state-serializer-data        | Bin 0 -> 270 bytes
 .../flink-1.7-context-state-serializer-snapshot    | Bin 0 -> 440 bytes
 .../flink-1.7-transaction-state-serializer-data    | Bin 0 -> 110 bytes
 ...flink-1.7-transaction-state-serializer-snapshot | Bin 0 -> 448 bytes
 .../connectors/kafka/FlinkKafkaProducer.java       |  38 ++
 .../KafkaSerializerSnapshotsMigrationTest.java     |  59 +++
 .../flink-1.7-context-state-serializer-data        | Bin 0 -> 270 bytes
 .../flink-1.7-context-state-serializer-snapshot    | Bin 0 -> 434 bytes
 .../flink-1.7-transaction-state-serializer-data    | Bin 0 -> 110 bytes
 ...flink-1.7-transaction-state-serializer-snapshot | Bin 0 -> 442 bytes
 .../ParameterlessTypeSerializerConfig.java         |   4 +
 .../typeutils/SimpleTypeSerializerSnapshot.java    |  88 +---
 .../common/typeutils/base/BigDecSerializer.java    |   3 +-
 .../common/typeutils/base/BigIntSerializer.java    |   3 +-
 .../common/typeutils/base/BooleanSerializer.java   |   3 +-
 .../typeutils/base/BooleanValueSerializer.java     |   3 +-
 .../api/common/typeutils/base/ByteSerializer.java  |   3 +-
 .../common/typeutils/base/ByteValueSerializer.java |   3 +-
 .../api/common/typeutils/base/CharSerializer.java  |   3 +-
 .../common/typeutils/base/CharValueSerializer.java |   3 +-
 .../api/common/typeutils/base/DateSerializer.java  |   3 +-
 .../common/typeutils/base/DoubleSerializer.java    |   3 +-
 .../typeutils/base/DoubleValueSerializer.java      |   3 +-
 .../api/common/typeutils/base/FloatSerializer.java |   3 +-
 .../typeutils/base/FloatValueSerializer.java       |   3 +-
 .../common/typeutils/base/InstantSerializer.java   |   3 +-
 .../api/common/typeutils/base/IntSerializer.java   |   3 +-
 .../common/typeutils/base/IntValueSerializer.java  |   3 +-
 .../api/common/typeutils/base/LongSerializer.java  |   3 +-
 .../common/typeutils/base/LongValueSerializer.java |   3 +-
 .../common/typeutils/base/NullValueSerializer.java |   3 +-
 .../api/common/typeutils/base/ShortSerializer.java |   3 +-
 .../typeutils/base/ShortValueSerializer.java       |   3 +-
 .../common/typeutils/base/SqlDateSerializer.java   |   3 +-
 .../common/typeutils/base/SqlTimeSerializer.java   |   3 +-
 .../typeutils/base/SqlTimestampSerializer.java     |   3 +-
 .../common/typeutils/base/StringSerializer.java    |   3 +-
 .../typeutils/base/StringValueSerializer.java      |   3 +-
 .../typeutils/base/TypeSerializerSingleton.java    |  12 +-
 .../api/common/typeutils/base/VoidSerializer.java  |   3 +-
 .../array/BooleanPrimitiveArraySerializer.java     |   3 +-
 .../base/array/BytePrimitiveArraySerializer.java   |   3 +-
 .../base/array/CharPrimitiveArraySerializer.java   |   3 +-
 .../base/array/DoublePrimitiveArraySerializer.java |   3 +-
 .../base/array/FloatPrimitiveArraySerializer.java  |   3 +-
 .../base/array/IntPrimitiveArraySerializer.java    |   3 +-
 .../base/array/LongPrimitiveArraySerializer.java   |   3 +-
 .../base/array/ShortPrimitiveArraySerializer.java  |   3 +-
 .../base/array/StringArraySerializer.java          |   3 +-
 .../StatefulComplexPayloadSerializer.java          |  32 +-
 .../java/org/apache/flink/cep/nfa/DeweyNumber.java |  28 +-
 .../apache/flink/cep/nfa/NFAStateSerializer.java   |  21 +-
 .../apache/flink/cep/nfa/sharedbuffer/EventId.java |  36 +-
 .../apache/flink/cep/nfa/sharedbuffer/NodeId.java  |  86 +++-
 .../flink/cep/nfa/sharedbuffer/SharedBuffer.java   |   2 +-
 .../cep/nfa/sharedbuffer/SharedBufferEdge.java     |  92 +++-
 .../cep/nfa/sharedbuffer/SharedBufferNode.java     |  57 ++-
 .../cep/NFASerializerSnapshotsMigrationTest.java   |  78 ++++
 .../flink-1.6-dewey-number-serializer-data         | Bin 0 -> 80 bytes
 .../flink-1.6-dewey-number-serializer-snapshot     | Bin 0 -> 547 bytes
 .../resources/flink-1.6-event-id-serializer-data   | Bin 0 -> 120 bytes
 .../flink-1.6-event-id-serializer-snapshot         | Bin 0 -> 402 bytes
 .../resources/flink-1.6-node-id-serializer-data    | Bin 0 -> 250 bytes
 .../flink-1.6-node-id-serializer-snapshot          | Bin 0 -> 398 bytes
 .../flink-1.6-shared-buffer-edge-serializer-data   | Bin 0 -> 330 bytes
 ...link-1.6-shared-buffer-edge-serializer-snapshot | Bin 0 -> 438 bytes
 .../flink-1.6-shared-buffer-node-serializer-data   | Bin 0 -> 370 bytes
 ...link-1.6-shared-buffer-node-serializer-snapshot | Bin 0 -> 775 bytes
 .../flink-1.7-dewey-number-serializer-data         | Bin 0 -> 80 bytes
 .../flink-1.7-dewey-number-serializer-snapshot     | Bin 0 -> 535 bytes
 .../resources/flink-1.7-event-id-serializer-data   | Bin 0 -> 120 bytes
 .../flink-1.7-event-id-serializer-snapshot         | Bin 0 -> 390 bytes
 .../resources/flink-1.7-node-id-serializer-data    | Bin 0 -> 250 bytes
 .../flink-1.7-node-id-serializer-snapshot          | Bin 0 -> 386 bytes
 .../flink-1.7-shared-buffer-edge-serializer-data   | Bin 0 -> 330 bytes
 ...link-1.7-shared-buffer-edge-serializer-snapshot | Bin 0 -> 426 bytes
 .../flink-1.7-shared-buffer-node-serializer-data   | Bin 0 -> 370 bytes
 ...link-1.7-shared-buffer-node-serializer-snapshot | Bin 0 -> 763 bytes
 flink-libraries/flink-gelly-examples/pom.xml       |   8 +
 .../transform/LongValueWithProperHashCode.java     |  21 +-
 ...perHashCodeSerializerSnapshotMigrationTest.java |  55 +++
 ...ong-value-with-proper-hash-code-serializer-data | Bin 0 -> 80 bytes
 ...value-with-proper-hash-code-serializer-snapshot | Bin 0 -> 488 bytes
 ...ong-value-with-proper-hash-code-serializer-data | Bin 0 -> 80 bytes
 ...value-with-proper-hash-code-serializer-snapshot | Bin 0 -> 476 bytes
 .../types/valuearray/ByteValueArraySerializer.java |  20 +-
 .../types/valuearray/CharValueArraySerializer.java |  21 +-
 .../valuearray/DoubleValueArraySerializer.java     |  21 +-
 .../valuearray/FloatValueArraySerializer.java      |  21 +-
 .../types/valuearray/IntValueArraySerializer.java  |  21 +-
 .../types/valuearray/LongValueArraySerializer.java |  21 +-
 .../types/valuearray/NullValueArraySerializer.java |  20 +
 .../valuearray/ShortValueArraySerializer.java      |  21 +-
 .../valuearray/StringValueArraySerializer.java     |  21 +-
 .../ValueArraySerializerSnapshotMigrationTest.java |  93 ++++
 .../flink-1.6-byte-value-array-serializer-data     | Bin 0 -> 70 bytes
 .../flink-1.6-byte-value-array-serializer-snapshot | Bin 0 -> 404 bytes
 .../flink-1.6-char-value-array-serializer-data     | Bin
 .../flink-1.6-char-value-array-serializer-snapshot | Bin 0 -> 404 bytes
 .../flink-1.6-double-value-array-serializer-data   | Bin 0 -> 280 bytes
 ...link-1.6-double-value-array-serializer-snapshot | Bin 0 -> 408 bytes
 .../flink-1.6-float-value-array-serializer-data    | Bin 0 -> 160 bytes
 ...flink-1.6-float-value-array-serializer-snapshot | Bin 0 -> 406 bytes
 .../flink-1.6-int-value-array-serializer-data      | Bin
 .../flink-1.6-int-value-array-serializer-snapshot  | Bin 0 -> 402 bytes
 .../flink-1.6-long-value-array-serializer-data     | Bin 0 -> 280 bytes
 .../flink-1.6-long-value-array-serializer-snapshot | Bin 0 -> 404 bytes
 .../flink-1.6-null-value-array-serializer-data     | Bin 0 -> 40 bytes
 .../flink-1.6-null-value-array-serializer-snapshot | Bin 0 -> 404 bytes
 .../flink-1.6-short-value-array-serializer-data    | Bin 0 -> 100 bytes
 ...flink-1.6-short-value-array-serializer-snapshot | Bin 0 -> 406 bytes
 .../flink-1.6-string-value-array-serializer-data   | Bin 0 -> 280 bytes
 ...link-1.6-string-value-array-serializer-snapshot | Bin 0 -> 408 bytes
 .../resources/flink-1.7-array-list-serializer-data | Bin
 .../flink-1.7-array-list-serializer-snapshot       | Bin
 ...k-1.7-avro-generic-type-serializer-address-data | Bin
 ...7-avro-generic-type-serializer-address-snapshot | Bin
 .../flink-1.7-avro-type-serializer-address-data    | Bin
 ...flink-1.7-avro-type-serializer-address-snapshot | Bin
 .../flink-1.7-byte-value-array-serializer-data     | Bin 0 -> 70 bytes
 .../flink-1.7-byte-value-array-serializer-snapshot | Bin 0 -> 392 bytes
 .../flink-1.7-char-value-array-serializer-data     | Bin
 .../flink-1.7-char-value-array-serializer-snapshot | Bin 0 -> 392 bytes
 .../flink-1.7-double-value-array-serializer-data   | Bin 0 -> 280 bytes
 ...link-1.7-double-value-array-serializer-snapshot | Bin 0 -> 396 bytes
 .../resources/flink-1.7-either-serializer-data     |   0
 .../resources/flink-1.7-either-serializer-snapshot | Bin
 .../flink-1.7-float-value-array-serializer-data    | Bin 0 -> 160 bytes
 ...flink-1.7-float-value-array-serializer-snapshot | Bin 0 -> 394 bytes
 .../flink-1.7-int-value-array-serializer-data      | Bin
 .../flink-1.7-int-value-array-serializer-snapshot  | Bin 0 -> 390 bytes
 .../flink-1.7-long-value-array-serializer-data     | Bin 0 -> 280 bytes
 .../flink-1.7-long-value-array-serializer-snapshot | Bin 0 -> 392 bytes
 .../flink-1.7-null-value-array-serializer-data     | Bin 0 -> 40 bytes
 .../flink-1.7-null-value-array-serializer-snapshot | Bin 0 -> 392 bytes
 .../flink-1.7-short-value-array-serializer-data    | Bin 0 -> 100 bytes
 ...flink-1.7-short-value-array-serializer-snapshot | Bin 0 -> 394 bytes
 .../flink-1.7-string-value-array-serializer-data   | Bin 0 -> 280 bytes
 ...link-1.7-string-value-array-serializer-snapshot | Bin 0 -> 396 bytes
 .../client/VoidNamespaceSerializer.java            |  20 +
 .../apache/flink/runtime/state/JavaSerializer.java |  20 +
 .../runtime/state/VoidNamespaceSerializer.java     |  20 +
 .../state/JavaSerializerSnapshotMigrationTest.java |  57 +++
 .../runtime/state/MemoryStateBackendTest.java      | 140 ------
 .../runtime/state/OperatorStateBackendTest.java    |  59 ---
 .../flink/runtime/state/StateBackendTestBase.java  | 473 ---------------------
 ...dNamespacieSerializerSnapshotMigrationTest.java |  55 +++
 .../test/resources/flink-1.6-java-serializer-data  | Bin 0 -> 180 bytes
 .../resources/flink-1.6-java-serializer-snapshot   | Bin 0 -> 366 bytes
 .../flink-1.6-void-namespace-serializer-data       | Bin
 .../flink-1.6-void-namespace-serializer-snapshot   | Bin 0 -> 384 bytes
 .../test/resources/flink-1.7-java-serializer-data  | Bin 0 -> 180 bytes
 .../resources/flink-1.7-java-serializer-snapshot   | Bin 0 -> 354 bytes
 .../flink-1.7-void-namespace-serializer-data       | Bin
 .../flink-1.7-void-namespace-serializer-snapshot   | Bin 0 -> 372 bytes
 .../flink/api/scala/typeutils/UnitSerializer.scala |  27 ++
 .../api/windowing/windows/GlobalWindow.java        |  20 +
 .../api/windowing/windows/TimeWindow.java          |  20 +
 .../WindowSerializerSnapshotsMigrationTest.java    |  60 +++
 .../flink-1.6-global-window-serializer-data        | Bin
 .../flink-1.6-global-window-serializer-snapshot    | Bin 0 -> 420 bytes
 .../flink-1.6-time-window-serializer-data          | Bin 0 -> 160 bytes
 .../flink-1.6-time-window-serializer-snapshot      | Bin 0 -> 416 bytes
 .../flink-1.7-global-window-serializer-data        | Bin
 .../flink-1.7-global-window-serializer-snapshot    | Bin 0 -> 408 bytes
 .../flink-1.7-time-window-serializer-data          | Bin 0 -> 160 bytes
 .../flink-1.7-time-window-serializer-snapshot      | Bin 0 -> 404 bytes
 .../jar/CheckpointingCustomKvStateProgram.java     |  20 +
 174 files changed, 1312 insertions(+), 879 deletions(-)
 create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerSnapshotsMigrationTest.java
 create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-context-state-serializer-data
 create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-context-state-serializer-snapshot
 create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-transaction-state-serializer-data
 create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-transaction-state-serializer-snapshot
 create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-context-state-serializer-data
 create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-context-state-serializer-snapshot
 create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-transaction-state-serializer-data
 create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-transaction-state-serializer-snapshot
 create mode 100644 flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerSnapshotsMigrationTest.java
 create mode 100644 flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-context-state-serializer-data
 create mode 100644 flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-context-state-serializer-snapshot
 create mode 100644 flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-transaction-state-serializer-data
 create mode 100644 flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-transaction-state-serializer-snapshot
 create mode 100644 flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerSnapshotsMigrationTest.java
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.6-dewey-number-serializer-data
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.6-dewey-number-serializer-snapshot
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.6-event-id-serializer-data
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.6-event-id-serializer-snapshot
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.6-node-id-serializer-data
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.6-node-id-serializer-snapshot
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-edge-serializer-data
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-edge-serializer-snapshot
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-node-serializer-data
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-node-serializer-snapshot
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.7-dewey-number-serializer-data
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.7-dewey-number-serializer-snapshot
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.7-event-id-serializer-data
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.7-event-id-serializer-snapshot
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.7-node-id-serializer-data
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.7-node-id-serializer-snapshot
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-edge-serializer-data
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-edge-serializer-snapshot
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-node-serializer-data
 create mode 100644 flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-node-serializer-snapshot
 create mode 100644 flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCodeSerializerSnapshotMigrationTest.java
 create mode 100644 flink-libraries/flink-gelly-examples/src/test/resources/flink-1.6-long-value-with-proper-hash-code-serializer-data
 create mode 100644 flink-libraries/flink-gelly-examples/src/test/resources/flink-1.6-long-value-with-proper-hash-code-serializer-snapshot
 create mode 100644 flink-libraries/flink-gelly-examples/src/test/resources/flink-1.7-long-value-with-proper-hash-code-serializer-data
 create mode 100644 flink-libraries/flink-gelly-examples/src/test/resources/flink-1.7-long-value-with-proper-hash-code-serializer-snapshot
 create mode 100644 flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ValueArraySerializerSnapshotMigrationTest.java
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-byte-value-array-serializer-data
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-byte-value-array-serializer-snapshot
 copy flink-core/src/test/resources/flink-1.7-char-primitive-array-serializer-data => flink-libraries/flink-gelly/src/test/resources/flink-1.6-char-value-array-serializer-data (100%)
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-char-value-array-serializer-snapshot
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-double-value-array-serializer-data
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-double-value-array-serializer-snapshot
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-float-value-array-serializer-data
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-float-value-array-serializer-snapshot
 copy flink-core/src/test/resources/flink-1.7-int-primitive-array-serializer-data => flink-libraries/flink-gelly/src/test/resources/flink-1.6-int-value-array-serializer-data (100%)
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-int-value-array-serializer-snapshot
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-long-value-array-serializer-data
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-long-value-array-serializer-snapshot
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-null-value-array-serializer-data
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-null-value-array-serializer-snapshot
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-short-value-array-serializer-data
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-short-value-array-serializer-snapshot
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-string-value-array-serializer-data
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.6-string-value-array-serializer-snapshot
 copy flink-runtime/src/test/resources/flink-1.7-arraylist-serializer-data => flink-libraries/flink-gelly/src/test/resources/flink-1.7-array-list-serializer-data (100%)
 copy flink-runtime/src/test/resources/flink-1.7-arraylist-serializer-snapshot => flink-libraries/flink-gelly/src/test/resources/flink-1.7-array-list-serializer-snapshot (100%)
 copy flink-formats/flink-avro/src/test/resources/flink-1.7-avro-type-serializer-address-data => flink-libraries/flink-gelly/src/test/resources/flink-1.7-avro-generic-type-serializer-address-data (100%)
 copy {flink-formats/flink-avro => flink-libraries/flink-gelly}/src/test/resources/flink-1.7-avro-generic-type-serializer-address-snapshot (100%)
 copy {flink-formats/flink-avro => flink-libraries/flink-gelly}/src/test/resources/flink-1.7-avro-type-serializer-address-data (100%)
 copy {flink-formats/flink-avro => flink-libraries/flink-gelly}/src/test/resources/flink-1.7-avro-type-serializer-address-snapshot (100%)
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-byte-value-array-serializer-data
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-byte-value-array-serializer-snapshot
 copy flink-core/src/test/resources/flink-1.7-char-primitive-array-serializer-data => flink-libraries/flink-gelly/src/test/resources/flink-1.7-char-value-array-serializer-data (100%)
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-char-value-array-serializer-snapshot
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-double-value-array-serializer-data
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-double-value-array-serializer-snapshot
 copy {flink-core => flink-libraries/flink-gelly}/src/test/resources/flink-1.7-either-serializer-data (100%)
 copy {flink-core => flink-libraries/flink-gelly}/src/test/resources/flink-1.7-either-serializer-snapshot (100%)
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-float-value-array-serializer-data
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-float-value-array-serializer-snapshot
 copy flink-core/src/test/resources/flink-1.7-int-primitive-array-serializer-data => flink-libraries/flink-gelly/src/test/resources/flink-1.7-int-value-array-serializer-data (100%)
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-int-value-array-serializer-snapshot
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-long-value-array-serializer-data
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-long-value-array-serializer-snapshot
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-null-value-array-serializer-data
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-null-value-array-serializer-snapshot
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-short-value-array-serializer-data
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-short-value-array-serializer-snapshot
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-string-value-array-serializer-data
 create mode 100644 flink-libraries/flink-gelly/src/test/resources/flink-1.7-string-value-array-serializer-snapshot
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/JavaSerializerSnapshotMigrationTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/VoidNamespacieSerializerSnapshotMigrationTest.java
 create mode 100644 flink-runtime/src/test/resources/flink-1.6-java-serializer-data
 create mode 100644 flink-runtime/src/test/resources/flink-1.6-java-serializer-snapshot
 copy flink-core/src/test/resources/flink-1.7-boolean-value-serializer-data => flink-runtime/src/test/resources/flink-1.6-void-namespace-serializer-data (100%)
 create mode 100644 flink-runtime/src/test/resources/flink-1.6-void-namespace-serializer-snapshot
 create mode 100644 flink-runtime/src/test/resources/flink-1.7-java-serializer-data
 create mode 100644 flink-runtime/src/test/resources/flink-1.7-java-serializer-snapshot
 copy flink-core/src/test/resources/flink-1.7-boolean-value-serializer-data => flink-runtime/src/test/resources/flink-1.7-void-namespace-serializer-data (100%)
 create mode 100644 flink-runtime/src/test/resources/flink-1.7-void-namespace-serializer-snapshot
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowSerializerSnapshotsMigrationTest.java
 copy flink-core/src/test/resources/flink-1.7-boolean-value-serializer-data => flink-streaming-java/src/test/resources/flink-1.6-global-window-serializer-data (100%)
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-global-window-serializer-snapshot
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-time-window-serializer-data
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-time-window-serializer-snapshot
 copy flink-core/src/test/resources/flink-1.7-boolean-value-serializer-data => flink-streaming-java/src/test/resources/flink-1.7-global-window-serializer-data (100%)
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-global-window-serializer-snapshot
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-time-window-serializer-data
 create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-time-window-serializer-snapshot


[flink] 04/05: [FLINK-11328] [e2e] Do not use deprecated ParameterlessTypeSerializerConfig in e2e tests

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eb1241eec517d3f906625bc67e722d097a9514cd
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Jan 18 14:12:51 2019 +0100

    [FLINK-11328] [e2e] Do not use deprecated ParameterlessTypeSerializerConfig in e2e tests
---
 .../StatefulComplexPayloadSerializer.java          | 32 ++++++++++------------
 1 file changed, 14 insertions(+), 18 deletions(-)

diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/StatefulComplexPayloadSerializer.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/StatefulComplexPayloadSerializer.java
index 89b16b6..294241b 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/StatefulComplexPayloadSerializer.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/StatefulComplexPayloadSerializer.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.streaming.tests.artificialstate;
 
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -143,21 +141,8 @@ public class StatefulComplexPayloadSerializer extends TypeSerializer<ComplexPayl
 	}
 
 	@Override
-	public TypeSerializerConfigSnapshot<ComplexPayload> snapshotConfiguration() {
-		// type serializer singletons should always be parameter-less
-		return new ParameterlessTypeSerializerConfig<>(getSerializationFormatIdentifier());
-	}
-
-	@Override
-	public CompatibilityResult<ComplexPayload> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-		if (configSnapshot instanceof ParameterlessTypeSerializerConfig
-			&& isCompatibleSerializationFormatIdentifier(
-			((ParameterlessTypeSerializerConfig<?>) configSnapshot).getSerializationFormatIdentifier())) {
-
-			return CompatibilityResult.compatible();
-		} else {
-			return CompatibilityResult.requiresMigration();
-		}
+	public Snapshot snapshotConfiguration() {
+		return new Snapshot();
 	}
 
 	private boolean isCompatibleSerializationFormatIdentifier(String identifier) {
@@ -167,4 +152,15 @@ public class StatefulComplexPayloadSerializer extends TypeSerializer<ComplexPayl
 	private String getSerializationFormatIdentifier() {
 		return getClass().getCanonicalName();
 	}
+
+	// ----------------------------------------------------------------------------------------
+
+	/**
+	 * Snapshot for the {@link StatefulComplexPayloadSerializer}.
+	 */
+	public static class Snapshot extends SimpleTypeSerializerSnapshot<ComplexPayload> {
+		public Snapshot() {
+			super(StatefulComplexPayloadSerializer::new);
+		}
+	}
 }


[flink] 03/05: [hotfix] [tests] Remove redundant outdated migration tests in StateBackendTestBase

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ef4784835b2f3805dcfe3d4ef0abf5cbf83e9ba1
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Jan 18 14:02:13 2019 +0100

    [hotfix] [tests] Remove redundant outdated migration tests in StateBackendTestBase
    
    Those tests were implemented before we had the state schema evolution
    feature in Flink 1.7, and were implemented using the old legacy
    serialization compatibility APIs. Moreoever, the behaviours that they
    test are already more comprehensively covered in
    StateBackendMigrationTestBase.
---
 .../runtime/state/MemoryStateBackendTest.java      | 140 ------
 .../runtime/state/OperatorStateBackendTest.java    |  59 ---
 .../flink/runtime/state/StateBackendTestBase.java  | 473 ---------------------
 3 files changed, 672 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 3b157ba..2ac1ea1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -18,34 +18,11 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.StateObjectCollection;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
-import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
-import org.apache.flink.util.ExceptionUtils;
 
-import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.concurrent.RunnableFuture;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
 /**
  * Tests for the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend}.
  */
@@ -82,123 +59,6 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 	@Test
 	public void testMapStateRestoreWithWrongSerializers() {}
 
-
-
-	/**
-	 * Verifies that the operator state backend fails with appropriate error and message if
-	 * previous serializer can not be restored.
-	 */
-	@Test
-	public void testOperatorStateRestoreFailsIfSerializerDeserializationFails() throws Exception {
-		DummyEnvironment env = new DummyEnvironment();
-		AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
-
-		OperatorStateBackend operatorStateBackend =
-			abstractStateBackend.createOperatorStateBackend(env, "test-op-name");
-
-		// write some state
-		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
-		ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
-		ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
-		ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1);
-		ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2);
-		ListState<Serializable> listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
-
-		listState1.add(42);
-		listState1.add(4711);
-
-		listState2.add(7);
-		listState2.add(13);
-		listState2.add(23);
-
-		listState3.add(17);
-		listState3.add(18);
-		listState3.add(19);
-		listState3.add(20);
-
-		CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
-
-		RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture =
-			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-
-		SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(runnableFuture);
-		OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
-
-		try {
-
-			operatorStateBackend.close();
-			operatorStateBackend.dispose();
-
-			env = new DummyEnvironment(
-				new ArtificialCNFExceptionThrowingClassLoader(
-					getClass().getClassLoader(),
-					Collections.singleton(JavaSerializer.class.getName())));
-
-			operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
-				env,
-				"testOperator");
-
-			operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
-
-			fail("The operator state restore should have failed if the previous state serializer could not be loaded.");
-		} catch (Exception expected) {
-			Assert.assertTrue(ExceptionUtils.findThrowable(expected, ClassNotFoundException.class).isPresent());
-		} finally {
-			stateHandle.discardState();
-		}
-	}
-
-	/**
-	 * Verifies that memory-backed keyed state backend fails with appropriate error and message if
-	 * previous serializer can not be restored.
-	 */
-	@Test
-	public void testKeyedStateRestoreFailsIfSerializerDeserializationFails() throws Exception {
-		CheckpointStreamFactory streamFactory = createStreamFactory();
-		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-		KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
-
-		ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null);
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
-
-		HeapKeyedStateBackend<Integer> heapBackend = (HeapKeyedStateBackend<Integer>) backend;
-
-		assertEquals(0, heapBackend.numKeyValueStateEntries());
-
-		ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
-		// write some state
-		backend.setCurrentKey(0);
-		state.update("hello");
-		state.update("ciao");
-
-		KeyedStateHandle snapshot = runSnapshot(
-			((HeapKeyedStateBackend<Integer>) backend).snapshot(
-				682375462378L,
-				2,
-				streamFactory,
-				CheckpointOptions.forCheckpointWithDefaultLocation()),
-			sharedStateRegistry);
-
-		backend.dispose();
-
-		// ========== restore snapshot ==========
-
-		try {
-			restoreKeyedBackend(
-				IntSerializer.INSTANCE,
-				snapshot,
-				new DummyEnvironment(
-					new ArtificialCNFExceptionThrowingClassLoader(
-						getClass().getClassLoader(),
-						Collections.singleton(StringSerializer.class.getName()))));
-
-			fail("The keyed state restore should have failed if the previous state serializer could not be loaded.");
-		} catch (Exception expected) {
-			Assert.assertTrue(ExceptionUtils.findThrowable(expected, ClassNotFoundException.class).isPresent());
-		}
-	}
-
 	@Ignore
 	@Test
 	public void testConcurrentMapIfQueryable() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 178671b..3918303 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -36,14 +36,11 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
-import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.junit.Assert;
@@ -52,7 +49,6 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -843,61 +839,6 @@ public class OperatorStateBackendTest {
 		}
 	}
 
-	@Test
-	public void testRestoreFailsIfSerializerDeserializationFails() throws Exception {
-		AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);
-
-		OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name");
-
-		// write some state
-		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
-		ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
-		ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
-		ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1);
-		ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2);
-		ListState<Serializable> listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
-
-		listState1.add(42);
-		listState1.add(4711);
-
-		listState2.add(7);
-		listState2.add(13);
-		listState2.add(23);
-
-		listState3.add(17);
-		listState3.add(18);
-		listState3.add(19);
-		listState3.add(20);
-
-		CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
-		RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture =
-			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-
-		SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(runnableFuture);
-		OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
-
-		try {
-
-			operatorStateBackend.close();
-			operatorStateBackend.dispose();
-
-			operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
-				new DummyEnvironment(
-					new ArtificialCNFExceptionThrowingClassLoader(
-						getClass().getClassLoader(),
-						Collections.singleton(JavaSerializer.class.getName()))),
-				"testOperator");
-
-			operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
-
-			fail("The operator state restore should have failed if the previous state serializer could not be loaded.");
-		} catch (Exception expected) {
-			Assert.assertTrue(ExceptionUtils.findThrowable(expected, ClassNotFoundException.class).isPresent());
-		} finally {
-			stateHandle.discardState();
-		}
-	}
-
 	static final class MutableType implements Serializable {
 
 		private static final long serialVersionUID = 1L;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index f51ab31..f1269fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -38,10 +38,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -52,11 +49,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
@@ -80,9 +72,7 @@ import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
-import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
@@ -93,12 +83,10 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -107,7 +95,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.PrimitiveIterator;
 import java.util.Random;
 import java.util.Timer;
@@ -970,282 +957,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
-	public void testStateSerializerReconfiguration() throws Exception {
-
-		CheckpointStreamFactory streamFactory = createStreamFactory();
-		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-		Environment env = new DummyEnvironment();
-
-		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
-
-		try {
-			ValueStateDescriptor<TestCustomStateClass> kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializer());
-			ValueState<TestCustomStateClass> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
-			// ============== create snapshot, using the non-reconfigured serializer ==============
-
-			// make some modifications
-			backend.setCurrentKey(1);
-			state.update(new TestCustomStateClass("test-message-1", "this-should-be-ignored"));
-
-			backend.setCurrentKey(2);
-			state.update(new TestCustomStateClass("test-message-2", "this-should-be-ignored"));
-
-			// verify that our assumption that the serializer is not yet reconfigured;
-			// we cast the state handle to the internal representation in order to retrieve the serializer
-			InternalKvState internal = (InternalKvState) state;
-			assertTrue(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer);
-			assertFalse(((TestReconfigurableCustomTypeSerializer) internal.getValueSerializer()).isReconfigured());
-
-			KeyedStateHandle snapshot1 = runSnapshot(
-				backend.snapshot(
-					682375462378L,
-					2,
-					streamFactory,
-					CheckpointOptions.forCheckpointWithDefaultLocation()),
-				sharedStateRegistry);
-
-			backend.dispose();
-
-			// ========== restore snapshot, which should reconfigure the serializer, and then create a snapshot again ==========
-
-			env = new DummyEnvironment();
-
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1, env);
-
-			kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializer());
-			state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
-			// verify that the serializer used is correctly reconfigured
-			internal = (InternalKvState) state;
-			assertTrue(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer);
-			assertTrue(((TestReconfigurableCustomTypeSerializer) internal.getValueSerializer()).isReconfigured());
-
-			backend.setCurrentKey(1);
-			TestCustomStateClass restoredState1 = state.value();
-			assertEquals("test-message-1", restoredState1.getMessage());
-			// the previous serializer schema does not contain the extra message
-			assertNull(restoredState1.getExtraMessage());
-
-			state.update(new TestCustomStateClass("new-test-message-1", "extra-message-1"));
-
-			backend.setCurrentKey(2);
-			TestCustomStateClass restoredState2 = state.value();
-			assertEquals("test-message-2", restoredState2.getMessage());
-			assertNull(restoredState1.getExtraMessage());
-
-			state.update(new TestCustomStateClass("new-test-message-2", "extra-message-2"));
-
-			KeyedStateHandle snapshot2 = runSnapshot(
-				backend.snapshot(
-					682375462379L,
-					3,
-					streamFactory,
-					CheckpointOptions.forCheckpointWithDefaultLocation()),
-				sharedStateRegistry);
-
-			snapshot1.discardState();
-			backend.dispose();
-
-			// ========== restore snapshot again; state should now be in the new schema containing the extra message ==========
-
-			env = new DummyEnvironment();
-
-			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env);
-
-			snapshot2.discardState();
-
-			kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializer());
-			state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
-			internal = (InternalKvState) state;
-			assertTrue(internal.getValueSerializer() instanceof TestReconfigurableCustomTypeSerializer);
-			assertTrue(((TestReconfigurableCustomTypeSerializer) internal.getValueSerializer()).isReconfigured());
-
-			backend.setCurrentKey(1);
-			restoredState1 = state.value();
-			assertEquals("new-test-message-1", restoredState1.getMessage());
-			assertEquals("extra-message-1", restoredState1.getExtraMessage());
-
-			backend.setCurrentKey(2);
-			restoredState2 = state.value();
-			assertEquals("new-test-message-2", restoredState2.getMessage());
-			assertEquals("extra-message-2", restoredState2.getExtraMessage());
-		} finally {
-			backend.dispose();
-		}
-	}
-
-	@Test
-	public void testSerializerPresenceOnRestore() throws Exception {
-		CheckpointStreamFactory streamFactory = createStreamFactory();
-		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-		Environment env = new DummyEnvironment();
-
-		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
-
-		try {
-			ValueStateDescriptor<TestCustomStateClass> kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializerPreUpgrade());
-			ValueState<TestCustomStateClass> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
-			// ============== create snapshot, using the old serializer ==============
-
-			// make some modifications
-			backend.setCurrentKey(1);
-			state.update(new TestCustomStateClass("test-message-1", "this-should-be-ignored"));
-
-			backend.setCurrentKey(2);
-			state.update(new TestCustomStateClass("test-message-2", "this-should-be-ignored"));
-
-			KeyedStateHandle snapshot1 = runSnapshot(
-				backend.snapshot(
-					682375462378L,
-					2,
-					streamFactory,
-					CheckpointOptions.forCheckpointWithDefaultLocation()),
-				sharedStateRegistry);
-
-			backend.dispose();
-
-			// ========== restore snapshot, using the new serializer (that has different classname) ==========
-
-			// on restore, simulate that the previous serializer class is no longer in the classloader
-			env = new DummyEnvironment(
-				new ArtificialCNFExceptionThrowingClassLoader(
-					getClass().getClassLoader(),
-					Collections.singleton(TestReconfigurableCustomTypeSerializerPreUpgrade.class.getName())));
-
-			try {
-				backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1, env);
-			} catch (IOException e) {
-				if (!isSerializerPresenceRequiredOnRestore()) {
-					fail("Presence of old serializer should not have been required.");
-				} else {
-					// test success
-					return;
-				}
-			}
-
-			// if serializer presence is not required, continue on to modify some state to make sure that everything works correctly
-			kvId = new ValueStateDescriptor<>("id", new TestReconfigurableCustomTypeSerializerUpgraded());
-			state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
-
-			backend.setCurrentKey(1);
-			state.update(new TestCustomStateClass("new-test-message-1", "extra-message-1"));
-
-			backend.setCurrentKey(2);
-			state.update(new TestCustomStateClass("new-test-message-2", "extra-message-2"));
-
-			KeyedStateHandle snapshot2 = runSnapshot(
-				backend.snapshot(
-					682375462379L,
-					3,
-					streamFactory,
-					CheckpointOptions.forCheckpointWithDefaultLocation()),
-				sharedStateRegistry);
-
-			snapshot1.discardState();
-		} finally {
-			backend.dispose();
-		}
-	}
-
-	@Test
-	public void testPriorityQueueSerializerUpdates() throws Exception {
-
-		final String stateName = "test";
-		final CheckpointStreamFactory streamFactory = createStreamFactory();
-		final SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-
-		AbstractKeyedStateBackend<Integer> keyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
-
-		try {
-			TypeSerializer<InternalPriorityQueueTestBase.TestElement> serializer =
-				InternalPriorityQueueTestBase.TestElementSerializer.INSTANCE;
-
-			KeyGroupedInternalPriorityQueue<InternalPriorityQueueTestBase.TestElement> priorityQueue =
-				keyedBackend.create(stateName, serializer);
-
-			priorityQueue.add(new InternalPriorityQueueTestBase.TestElement(42L, 0L));
-
-			RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot =
-				keyedBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-
-			KeyedStateHandle keyedStateHandle = runSnapshot(snapshot, sharedStateRegistry);
-
-			keyedBackend.dispose();
-
-			// test restore with a modified but compatible serializer ---------------------------
-
-			keyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, keyedStateHandle);
-
-			serializer = new ModifiedTestElementSerializer();
-
-			priorityQueue = keyedBackend.create(stateName, serializer);
-
-			final InternalPriorityQueueTestBase.TestElement checkElement =
-				new InternalPriorityQueueTestBase.TestElement(4711L, 1L);
-			priorityQueue.add(checkElement);
-
-			snapshot = keyedBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-
-			keyedStateHandle = runSnapshot(snapshot, sharedStateRegistry);
-
-			keyedBackend.dispose();
-
-			// test that the modified serializer was actually used ---------------------------
-
-			keyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, keyedStateHandle);
-			priorityQueue = keyedBackend.create(stateName, serializer);
-
-			priorityQueue.poll();
-
-			ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos();
-			DataOutputViewStreamWrapper outWrapper = new DataOutputViewStreamWrapper(out);
-			serializer.serialize(checkElement, outWrapper);
-			InternalPriorityQueueTestBase.TestElement expected =
-				serializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(out.toByteArray())));
-
-			Assert.assertEquals(
-				expected,
-				priorityQueue.poll());
-			Assert.assertTrue(priorityQueue.isEmpty());
-
-			keyedBackend.dispose();
-
-			// test that incompatible serializer is rejected ---------------------------
-
-			serializer = InternalPriorityQueueTestBase.TestElementSerializer.INSTANCE;
-			keyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, keyedStateHandle);
-
-			try {
-				// this is expected to fail, because the old and new serializer shoulbe be incompatible through
-				// different revision numbers.
-				keyedBackend.create("test", serializer);
-				Assert.fail("Expected exception from incompatible serializer.");
-			} catch (Exception e) {
-				Assert.assertTrue("Exception was not caused by state migration: " + e,
-					ExceptionUtils.findThrowable(e, StateMigrationException.class).isPresent());
-			}
-		} finally {
-			keyedBackend.dispose();
-		}
-	}
-
-	public static class ModifiedTestElementSerializer extends InternalPriorityQueueTestBase.TestElementSerializer {
-
-		@Override
-		public void serialize(InternalPriorityQueueTestBase.TestElement record, DataOutputView target) throws IOException {
-			super.serialize(new InternalPriorityQueueTestBase.TestElement(record.getKey() + 1, record.getPriority() + 1), target);
-		}
-
-		@Override
-		protected int getRevision() {
-			return super.getRevision() + 1;
-		}
-	}
-
-	@Test
 	@SuppressWarnings("unchecked")
 	public void testValueState() throws Exception {
 		CheckpointStreamFactory streamFactory = createStreamFactory();
@@ -4573,190 +4284,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	/**
-	 * Custom state class used for testing state serializer schema migration.
-	 * The corresponding serializer used in the tests is {@link TestReconfigurableCustomTypeSerializer}.
-	 */
-	public static class TestCustomStateClass {
-
-		private String message;
-		private String extraMessage;
-
-		public TestCustomStateClass(String message, String extraMessage) {
-			this.message = message;
-			this.extraMessage = extraMessage;
-		}
-
-		public String getMessage() {
-			return message;
-		}
-
-		public void setMessage(String message) {
-			this.message = message;
-		}
-
-		public String getExtraMessage() {
-			return extraMessage;
-		}
-
-		public void setExtraMessage(String extraMessage) {
-			this.extraMessage = extraMessage;
-		}
-	}
-
-	/**
-	 * A reconfigurable serializer that simulates backwards compatible schema evolution for the {@link TestCustomStateClass}.
-	 * A flag is maintained to determine whether or not the serializer has be reconfigured.
-	 * Whether or not it has been reconfigured affects which fields of {@link TestCustomStateClass} instances are
-	 * written and read on serialization.
-	 */
-	public static class TestReconfigurableCustomTypeSerializer extends TypeSerializer<TestCustomStateClass> {
-
-		private boolean reconfigured = false;
-
-		public TestReconfigurableCustomTypeSerializer() {}
-
-		/** Copy constructor. */
-		private TestReconfigurableCustomTypeSerializer(boolean reconfigured) {
-			this.reconfigured = reconfigured;
-		}
-
-		@Override
-		public TypeSerializer<TestCustomStateClass> duplicate() {
-			return new TestReconfigurableCustomTypeSerializer(reconfigured);
-		}
-
-		@Override
-		public TestCustomStateClass createInstance() {
-			return new TestCustomStateClass(null, null);
-		}
-
-		@Override
-		public void serialize(TestCustomStateClass record, DataOutputView target) throws IOException {
-			target.writeBoolean(reconfigured);
-
-			target.writeUTF(record.getMessage());
-			if (reconfigured) {
-				target.writeUTF(record.getExtraMessage());
-			}
-		}
-
-		@Override
-		public TestCustomStateClass deserialize(DataInputView source) throws IOException {
-			boolean isNewSchema = source.readBoolean();
-
-			String message = source.readUTF();
-			if (isNewSchema) {
-				return new TestCustomStateClass(message, source.readUTF());
-			} else {
-				return new TestCustomStateClass(message, null);
-			}
-		}
-
-		@Override
-		public TestCustomStateClass deserialize(TestCustomStateClass reuse, DataInputView source) throws IOException {
-			boolean isNewSchema = source.readBoolean();
-
-			String message = source.readUTF();
-			if (isNewSchema) {
-				reuse.setMessage(message);
-				reuse.setExtraMessage(source.readUTF());
-				return reuse;
-			} else {
-				reuse.setMessage(message);
-				reuse.setExtraMessage(null);
-				return reuse;
-			}
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			boolean reconfigured = source.readBoolean();
-
-			target.writeUTF(source.readUTF());
-			if (reconfigured)
-			target.writeUTF(source.readUTF());
-		}
-
-		@Override
-		public TestCustomStateClass copy(TestCustomStateClass from) {
-			return new TestCustomStateClass(from.getMessage(), from.getExtraMessage());
-		}
-
-		@Override
-		public TestCustomStateClass copy(TestCustomStateClass from, TestCustomStateClass reuse) {
-			reuse.setMessage(from.getMessage());
-			reuse.setExtraMessage(from.getExtraMessage());
-			return reuse;
-		}
-
-		@Override
-		public int getLength() {
-			return 0;
-		}
-
-		@Override
-		public boolean isImmutableType() {
-			return false;
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return obj instanceof TestReconfigurableCustomTypeSerializer;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj == null) {
-				return false;
-			}
-
-			if (!(obj instanceof TestReconfigurableCustomTypeSerializer)) {
-				return false;
-			}
-
-			if (obj == this) {
-				return true;
-			} else {
-				TestReconfigurableCustomTypeSerializer other = (TestReconfigurableCustomTypeSerializer) obj;
-				return other.reconfigured == this.reconfigured;
-			}
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(getClass().getName(), reconfigured);
-		}
-
-		// -- reconfiguration --
-
-		public boolean isReconfigured() {
-			return reconfigured;
-		}
-
-		// -- config snapshot --
-
-		@Override
-		public TypeSerializerConfigSnapshot<TestCustomStateClass> snapshotConfiguration() {
-			return new ParameterlessTypeSerializerConfig<>(getClass().getName());
-		}
-
-		@Override
-		public CompatibilityResult<TestCustomStateClass> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-			if (configSnapshot instanceof ParameterlessTypeSerializerConfig &&
-					((ParameterlessTypeSerializerConfig<?>) configSnapshot).getSerializationFormatIdentifier().equals(getClass().getName())) {
-
-				this.reconfigured = true;
-				return CompatibilityResult.compatible();
-			} else {
-				return CompatibilityResult.requiresMigration();
-			}
-		}
-	}
-
-	public static class TestReconfigurableCustomTypeSerializerPreUpgrade extends TestReconfigurableCustomTypeSerializer {}
-	public static class TestReconfigurableCustomTypeSerializerUpgraded extends TestReconfigurableCustomTypeSerializer {}
-
-	/**
 	 * We throw this in our {@link ExceptionThrowingTestSerializer}.
 	 */
 	private static class ExpectedKryoTestException extends RuntimeException {}


[flink] 02/05: [FLINK-11328] [cep] Snapshots of NFA-related serializers should be a CompositeTypeSerializerSnapshot

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ade21d13478d8d9cf00bd80f2e80729636505522
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Jan 21 18:02:05 2019 +0100

    [FLINK-11328] [cep] Snapshots of NFA-related serializers should be a CompositeTypeSerializerSnapshot
---
 .../java/org/apache/flink/cep/nfa/DeweyNumber.java | 14 +---
 .../apache/flink/cep/nfa/NFAStateSerializer.java   |  2 +-
 .../apache/flink/cep/nfa/sharedbuffer/EventId.java | 16 ++---
 .../apache/flink/cep/nfa/sharedbuffer/NodeId.java  | 75 ++++++++++++++++----
 .../flink/cep/nfa/sharedbuffer/SharedBuffer.java   |  2 +-
 .../cep/nfa/sharedbuffer/SharedBufferEdge.java     | 80 ++++++++++++++++++----
 .../cep/nfa/sharedbuffer/SharedBufferNode.java     | 45 ++++++++++--
 7 files changed, 179 insertions(+), 55 deletions(-)

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index 53fffce..03881f3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -21,7 +21,6 @@ package org.apache.flink.cep.nfa;
 import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -196,8 +195,6 @@ public class DeweyNumber implements Serializable {
 
 		private static final long serialVersionUID = -5086792497034943656L;
 
-		private final IntSerializer elemSerializer = IntSerializer.INSTANCE;
-
 		public static final DeweyNumberSerializer INSTANCE = new DeweyNumberSerializer();
 
 		private DeweyNumberSerializer() {}
@@ -232,7 +229,7 @@ public class DeweyNumber implements Serializable {
 			final int size = record.length();
 			target.writeInt(size);
 			for (int i = 0; i < size; i++) {
-				elemSerializer.serialize(record.deweyNumber[i], target);
+				target.writeInt(record.deweyNumber[i]);
 			}
 		}
 
@@ -241,7 +238,7 @@ public class DeweyNumber implements Serializable {
 			final int size = source.readInt();
 			int[] number = new int[size];
 			for (int i = 0; i < size; i++) {
-				number[i] = elemSerializer.deserialize(source);
+				number[i] = source.readInt();
 			}
 			return new DeweyNumber(number);
 		}
@@ -256,7 +253,7 @@ public class DeweyNumber implements Serializable {
 			final int size = source.readInt();
 			target.writeInt(size);
 			for (int i = 0; i < size; i++) {
-				elemSerializer.copy(source, target);
+				target.writeInt(source.readInt());
 			}
 		}
 
@@ -270,11 +267,6 @@ public class DeweyNumber implements Serializable {
 			return true;
 		}
 
-		@Override
-		public int hashCode() {
-			return elemSerializer.hashCode();
-		}
-
 		// -----------------------------------------------------------------------------------
 
 		@Override
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
index ccbe25c..7b83030 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
@@ -95,7 +95,7 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 	private static final StringSerializer STATE_NAME_SERIALIZER = StringSerializer.INSTANCE;
 	private static final LongSerializer TIMESTAMP_SERIALIZER = LongSerializer.INSTANCE;
 	private static final DeweyNumber.DeweyNumberSerializer VERSION_SERIALIZER = DeweyNumber.DeweyNumberSerializer.INSTANCE;
-	private static final NodeId.NodeIdSerializer NODE_ID_SERIALIZER = NodeId.NodeIdSerializer.INSTANCE;
+	private static final NodeId.NodeIdSerializer NODE_ID_SERIALIZER = new NodeId.NodeIdSerializer();
 	private static final EventId.EventIdSerializer EVENT_ID_SERIALIZER = EventId.EventIdSerializer.INSTANCE;
 
 	@Override
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
index 045cf38..3ac39b5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
@@ -21,8 +21,6 @@ package org.apache.flink.cep.nfa.sharedbuffer;
 import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -117,19 +115,19 @@ public class EventId implements Comparable<EventId> {
 
 		@Override
 		public int getLength() {
-			return 2 * LongSerializer.INSTANCE.getLength();
+			return Integer.BYTES + Long.BYTES;
 		}
 
 		@Override
 		public void serialize(EventId record, DataOutputView target) throws IOException {
-			IntSerializer.INSTANCE.serialize(record.id, target);
-			LongSerializer.INSTANCE.serialize(record.timestamp, target);
+			target.writeInt(record.id);
+			target.writeLong(record.timestamp);
 		}
 
 		@Override
 		public EventId deserialize(DataInputView source) throws IOException {
-			int id = IntSerializer.INSTANCE.deserialize(source);
-			long timestamp = LongSerializer.INSTANCE.deserialize(source);
+			int id = source.readInt();
+			long timestamp = source.readLong();
 
 			return new EventId(id, timestamp);
 		}
@@ -141,8 +139,8 @@ public class EventId implements Comparable<EventId> {
 
 		@Override
 		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			IntSerializer.INSTANCE.copy(source, target);
-			LongSerializer.INSTANCE.copy(source, target);
+			target.writeInt(source.readInt());
+			target.writeLong(source.readLong());
 		}
 
 		@Override
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
index 87dc2c3..2693d4b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
@@ -18,17 +18,20 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.StringValue;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Unique identifier for {@link SharedBufferNode}.
  */
@@ -81,9 +84,19 @@ public class NodeId {
 
 		private static final long serialVersionUID = 9209498028181378582L;
 
-		public static final NodeIdSerializer INSTANCE = new NodeIdSerializer();
+		/**
+		 * NOTE: this field should actually be final.
+		 * The reason that it isn't final is due to backward compatible deserialization
+		 * paths. See {@link #readObject(ObjectInputStream)}.
+		 */
+		private TypeSerializer<EventId> eventIdSerializer;
+
+		public NodeIdSerializer() {
+			this(EventId.EventIdSerializer.INSTANCE);
+		}
 
-		private NodeIdSerializer() {
+		private NodeIdSerializer(TypeSerializer<EventId> eventIdSerializer) {
+			this.eventIdSerializer = checkNotNull(eventIdSerializer);
 		}
 
 		@Override
@@ -115,8 +128,8 @@ public class NodeId {
 		public void serialize(NodeId record, DataOutputView target) throws IOException {
 			if (record != null) {
 				target.writeByte(1);
-				EventId.EventIdSerializer.INSTANCE.serialize(record.eventId, target);
-				StringSerializer.INSTANCE.serialize(record.pageName, target);
+				eventIdSerializer.serialize(record.eventId, target);
+				StringValue.writeString(record.pageName, target);
 			} else {
 				target.writeByte(0);
 			}
@@ -129,8 +142,8 @@ public class NodeId {
 				return null;
 			}
 
-			EventId eventId = EventId.EventIdSerializer.INSTANCE.deserialize(source);
-			String pageName = StringSerializer.INSTANCE.deserialize(source);
+			EventId eventId = eventIdSerializer.deserialize(source);
+			String pageName = StringValue.readString(source);
 			return new NodeId(eventId, pageName);
 		}
 
@@ -143,9 +156,8 @@ public class NodeId {
 		public void copy(DataInputView source, DataOutputView target) throws IOException {
 			target.writeByte(source.readByte());
 
-			LongSerializer.INSTANCE.copy(source, target); // eventId
-			LongSerializer.INSTANCE.copy(source, target); // timestamp
-			StringSerializer.INSTANCE.copy(source, target); // pageName
+			eventIdSerializer.copy(source, target);
+			StringValue.copyString(source, target);
 		}
 
 		@Override
@@ -157,17 +169,50 @@ public class NodeId {
 
 		@Override
 		public TypeSerializerSnapshot<NodeId> snapshotConfiguration() {
-			return new NodeIdSerializerSnapshot();
+			return new NodeIdSerializerSnapshot(this);
 		}
 
 		/**
 		 * Serializer configuration snapshot for compatibility and format evolution.
 		 */
 		@SuppressWarnings("WeakerAccess")
-		public static final class NodeIdSerializerSnapshot extends SimpleTypeSerializerSnapshot<NodeId> {
+		public static final class NodeIdSerializerSnapshot extends CompositeTypeSerializerSnapshot<NodeId, NodeIdSerializer> {
+
+			private static final int VERSION = 1;
 
 			public NodeIdSerializerSnapshot() {
-				super(() -> INSTANCE);
+				super(NodeIdSerializer.class);
+			}
+
+			public NodeIdSerializerSnapshot(NodeIdSerializer nodeIdSerializer) {
+				super(nodeIdSerializer);
+			}
+
+			@Override
+			protected int getCurrentOuterSnapshotVersion() {
+				return VERSION;
+			}
+
+			@Override
+			protected NodeIdSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+				return new NodeIdSerializer((EventId.EventIdSerializer) nestedSerializers[0]);
+			}
+
+			@Override
+			protected TypeSerializer<?>[] getNestedSerializers(NodeIdSerializer outerSerializer) {
+				return new TypeSerializer<?>[]{ outerSerializer.eventIdSerializer };
+			}
+		}
+
+		// ------------------------------------------------------------------------
+
+		private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+			in.defaultReadObject();
+
+			if (eventIdSerializer == null) {
+				// the nested serializer will be null if this was read from a savepoint taken with versions
+				// lower than Flink 1.7; in this case, we explicitly create instance for the nested serializer.
+				this.eventIdSerializer = EventId.EventIdSerializer.INSTANCE;
 			}
 		}
 	}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
index 5a37897..19f9d2a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
@@ -78,7 +78,7 @@ public class SharedBuffer<V> {
 		this.entries = stateStore.getMapState(
 			new MapStateDescriptor<>(
 				entriesStateName,
-				NodeId.NodeIdSerializer.INSTANCE,
+				new NodeId.NodeIdSerializer(),
 				new Lockable.LockableTypeSerializer<>(new SharedBufferNode.SharedBufferNodeSerializer())));
 
 		this.eventsCount = stateStore.getMapState(
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
index 2af92f5..d6a95f6 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.cep.nfa.DeweyNumber;
@@ -26,6 +27,9 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Versioned edge in {@link SharedBuffer} that allows retrieving predecessors.
@@ -67,9 +71,24 @@ public class SharedBufferEdge {
 
 		private static final long serialVersionUID = -5122474955050663979L;
 
-		static final SharedBufferEdgeSerializer INSTANCE = new SharedBufferEdgeSerializer();
+		/**
+		 * NOTE: these serializer fields should actually be final.
+		 * The reason that it isn't final is due to backward compatible deserialization
+		 * paths. See {@link #readObject(ObjectInputStream)}.
+		 */
+		private TypeSerializer<NodeId> nodeIdSerializer;
+		private TypeSerializer<DeweyNumber> deweyNumberSerializer;
+
+		public SharedBufferEdgeSerializer() {
+			this(new NodeId.NodeIdSerializer(), DeweyNumber.DeweyNumberSerializer.INSTANCE);
+		}
 
-		private SharedBufferEdgeSerializer() {}
+		private SharedBufferEdgeSerializer(
+				TypeSerializer<NodeId> nodeIdSerializer,
+				TypeSerializer<DeweyNumber> deweyNumberSerializer) {
+			this.nodeIdSerializer = checkNotNull(nodeIdSerializer);
+			this.deweyNumberSerializer = checkNotNull(deweyNumberSerializer);
+		}
 
 		@Override
 		public boolean isImmutableType() {
@@ -98,14 +117,14 @@ public class SharedBufferEdge {
 
 		@Override
 		public void serialize(SharedBufferEdge record, DataOutputView target) throws IOException {
-			NodeId.NodeIdSerializer.INSTANCE.serialize(record.target, target);
-			DeweyNumber.DeweyNumberSerializer.INSTANCE.serialize(record.deweyNumber, target);
+			nodeIdSerializer.serialize(record.target, target);
+			deweyNumberSerializer.serialize(record.deweyNumber, target);
 		}
 
 		@Override
 		public SharedBufferEdge deserialize(DataInputView source) throws IOException {
-			NodeId target = NodeId.NodeIdSerializer.INSTANCE.deserialize(source);
-			DeweyNumber deweyNumber = DeweyNumber.DeweyNumberSerializer.INSTANCE.deserialize(source);
+			NodeId target = nodeIdSerializer.deserialize(source);
+			DeweyNumber deweyNumber = deweyNumberSerializer.deserialize(source);
 			return new SharedBufferEdge(target, deweyNumber);
 		}
 
@@ -116,8 +135,8 @@ public class SharedBufferEdge {
 
 		@Override
 		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			NodeId.NodeIdSerializer.INSTANCE.copy(source, target);
-			DeweyNumber.DeweyNumberSerializer.INSTANCE.copy(source, target);
+			nodeIdSerializer.copy(source, target);
+			deweyNumberSerializer.copy(source, target);
 		}
 
 		@Override
@@ -129,17 +148,54 @@ public class SharedBufferEdge {
 
 		@Override
 		public TypeSerializerSnapshot<SharedBufferEdge> snapshotConfiguration() {
-			return new SharedBufferEdgeSerializerSnapshot();
+			return new SharedBufferEdgeSerializerSnapshot(this);
 		}
 
 		/**
 		 * Serializer configuration snapshot for compatibility and format evolution.
 		 */
 		@SuppressWarnings("WeakerAccess")
-		public static final class SharedBufferEdgeSerializerSnapshot extends SimpleTypeSerializerSnapshot<SharedBufferEdge> {
+		public static final class SharedBufferEdgeSerializerSnapshot
+				extends CompositeTypeSerializerSnapshot<SharedBufferEdge, SharedBufferEdgeSerializer> {
+
+			private static final int VERSION = 1;
 
 			public SharedBufferEdgeSerializerSnapshot() {
-				super(() -> INSTANCE);
+				super(SharedBufferEdgeSerializer.class);
+			}
+
+			public SharedBufferEdgeSerializerSnapshot(SharedBufferEdgeSerializer sharedBufferEdgeSerializer) {
+				super(sharedBufferEdgeSerializer);
+			}
+
+			@Override
+			protected int getCurrentOuterSnapshotVersion() {
+				return VERSION;
+			}
+
+			@Override
+			protected SharedBufferEdgeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+				return new SharedBufferEdgeSerializer(
+					(NodeId.NodeIdSerializer) nestedSerializers[0],
+					(DeweyNumber.DeweyNumberSerializer) nestedSerializers[1]);
+			}
+
+			@Override
+			protected TypeSerializer<?>[] getNestedSerializers(SharedBufferEdgeSerializer outerSerializer) {
+				return new TypeSerializer<?>[] { outerSerializer.nodeIdSerializer, outerSerializer.deweyNumberSerializer };
+			}
+		}
+
+		// ------------------------------------------------------------------------
+
+		private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+			in.defaultReadObject();
+
+			if (nodeIdSerializer == null) {
+				// the nested serializers will be null if this was read from a savepoint taken with versions
+				// lower than Flink 1.7; in this case, we explicitly create instances for the nested serializers
+				this.nodeIdSerializer = new NodeId.NodeIdSerializer();
+				this.deweyNumberSerializer = DeweyNumber.DeweyNumberSerializer.INSTANCE;
 			}
 		}
 	}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
index 96afdbb..473a342 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
@@ -30,6 +31,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * An entry in {@link SharedBuffer} that allows to store relations between different entries.
  */
@@ -65,8 +68,15 @@ public class SharedBufferNode {
 
 		private static final long serialVersionUID = -6687780732295439832L;
 
-		private final ListSerializer<SharedBufferEdge> edgesSerializer =
-			new ListSerializer<>(SharedBufferEdgeSerializer.INSTANCE);
+		private final ListSerializer<SharedBufferEdge> edgesSerializer;
+
+		public SharedBufferNodeSerializer() {
+			this.edgesSerializer = new ListSerializer<>(new SharedBufferEdgeSerializer());
+		}
+
+		private SharedBufferNodeSerializer(ListSerializer<SharedBufferEdge> edgesSerializer) {
+			this.edgesSerializer = checkNotNull(edgesSerializer);
+		}
 
 		@Override
 		public boolean isImmutableType() {
@@ -123,17 +133,40 @@ public class SharedBufferNode {
 
 		@Override
 		public TypeSerializerSnapshot<SharedBufferNode> snapshotConfiguration() {
-			return new SharedBufferNodeSerializerSnapshot();
+			return new SharedBufferNodeSerializerSnapshot(this);
 		}
 
 		/**
 		 * Serializer configuration snapshot for compatibility and format evolution.
 		 */
 		@SuppressWarnings("WeakerAccess")
-		public static final class SharedBufferNodeSerializerSnapshot extends SimpleTypeSerializerSnapshot<SharedBufferNode> {
+		public static final class SharedBufferNodeSerializerSnapshot
+				extends CompositeTypeSerializerSnapshot<SharedBufferNode, SharedBufferNodeSerializer> {
+
+			private static final int VERSION = 1;
 
 			public SharedBufferNodeSerializerSnapshot() {
-				super(SharedBufferNodeSerializer::new);
+				super(SharedBufferNodeSerializer.class);
+			}
+
+			public SharedBufferNodeSerializerSnapshot(SharedBufferNodeSerializer sharedBufferNodeSerializer) {
+				super(sharedBufferNodeSerializer);
+			}
+
+			@Override
+			protected int getCurrentOuterSnapshotVersion() {
+				return VERSION;
+			}
+
+			@Override
+			@SuppressWarnings("unchecked")
+			protected SharedBufferNodeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+				return new SharedBufferNodeSerializer((ListSerializer<SharedBufferEdge>) nestedSerializers[0]);
+			}
+
+			@Override
+			protected TypeSerializer<?>[] getNestedSerializers(SharedBufferNodeSerializer outerSerializer) {
+				return new TypeSerializer<?>[]{ outerSerializer.edgesSerializer };
 			}
 		}
 	}


[flink] 05/05: [FLINK-11328] [tests] Add serializer migration tests for all parameterless serializers

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c73f2b5ddfec39878accefa4447c648de0be248f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Fri Jan 18 15:01:19 2019 +0100

    [FLINK-11328] [tests] Add serializer migration tests for all parameterless serializers
    
    This closes #7553.
---
 .../KafkaSerializerSnapshotsMigrationTest.java     |  59 +++++++++++++
 .../flink-1.6-context-state-serializer-data        | Bin 0 -> 270 bytes
 .../flink-1.6-context-state-serializer-snapshot    | Bin 0 -> 452 bytes
 .../flink-1.6-transaction-state-serializer-data    | Bin 0 -> 110 bytes
 ...flink-1.6-transaction-state-serializer-snapshot | Bin 0 -> 460 bytes
 .../flink-1.7-context-state-serializer-data        | Bin 0 -> 270 bytes
 .../flink-1.7-context-state-serializer-snapshot    | Bin 0 -> 440 bytes
 .../flink-1.7-transaction-state-serializer-data    | Bin 0 -> 110 bytes
 ...flink-1.7-transaction-state-serializer-snapshot | Bin 0 -> 448 bytes
 .../KafkaSerializerSnapshotsMigrationTest.java     |  59 +++++++++++++
 .../flink-1.7-context-state-serializer-data        | Bin 0 -> 270 bytes
 .../flink-1.7-context-state-serializer-snapshot    | Bin 0 -> 434 bytes
 .../flink-1.7-transaction-state-serializer-data    | Bin 0 -> 110 bytes
 ...flink-1.7-transaction-state-serializer-snapshot | Bin 0 -> 442 bytes
 .../cep/NFASerializerSnapshotsMigrationTest.java   |  78 +++++++++++++++++
 .../flink-1.6-dewey-number-serializer-data         | Bin 0 -> 80 bytes
 .../flink-1.6-dewey-number-serializer-snapshot     | Bin 0 -> 547 bytes
 .../resources/flink-1.6-event-id-serializer-data   | Bin 0 -> 120 bytes
 .../flink-1.6-event-id-serializer-snapshot         | Bin 0 -> 402 bytes
 .../resources/flink-1.6-node-id-serializer-data    | Bin 0 -> 250 bytes
 .../flink-1.6-node-id-serializer-snapshot          | Bin 0 -> 398 bytes
 .../flink-1.6-shared-buffer-edge-serializer-data   | Bin 0 -> 330 bytes
 ...link-1.6-shared-buffer-edge-serializer-snapshot | Bin 0 -> 438 bytes
 .../flink-1.6-shared-buffer-node-serializer-data   | Bin 0 -> 370 bytes
 ...link-1.6-shared-buffer-node-serializer-snapshot | Bin 0 -> 775 bytes
 .../flink-1.7-dewey-number-serializer-data         | Bin 0 -> 80 bytes
 .../flink-1.7-dewey-number-serializer-snapshot     | Bin 0 -> 535 bytes
 .../resources/flink-1.7-event-id-serializer-data   | Bin 0 -> 120 bytes
 .../flink-1.7-event-id-serializer-snapshot         | Bin 0 -> 390 bytes
 .../resources/flink-1.7-node-id-serializer-data    | Bin 0 -> 250 bytes
 .../flink-1.7-node-id-serializer-snapshot          | Bin 0 -> 386 bytes
 .../flink-1.7-shared-buffer-edge-serializer-data   | Bin 0 -> 330 bytes
 ...link-1.7-shared-buffer-edge-serializer-snapshot | Bin 0 -> 426 bytes
 .../flink-1.7-shared-buffer-node-serializer-data   | Bin 0 -> 370 bytes
 ...link-1.7-shared-buffer-node-serializer-snapshot | Bin 0 -> 763 bytes
 flink-libraries/flink-gelly-examples/pom.xml       |   8 ++
 ...perHashCodeSerializerSnapshotMigrationTest.java |  55 ++++++++++++
 ...ong-value-with-proper-hash-code-serializer-data | Bin 0 -> 80 bytes
 ...value-with-proper-hash-code-serializer-snapshot | Bin 0 -> 488 bytes
 ...ong-value-with-proper-hash-code-serializer-data | Bin 0 -> 80 bytes
 ...value-with-proper-hash-code-serializer-snapshot | Bin 0 -> 476 bytes
 .../ValueArraySerializerSnapshotMigrationTest.java |  93 +++++++++++++++++++++
 .../flink-1.6-byte-value-array-serializer-data     | Bin 0 -> 70 bytes
 .../flink-1.6-byte-value-array-serializer-snapshot | Bin 0 -> 404 bytes
 .../flink-1.6-char-value-array-serializer-data     | Bin 0 -> 240 bytes
 .../flink-1.6-char-value-array-serializer-snapshot | Bin 0 -> 404 bytes
 .../flink-1.6-double-value-array-serializer-data   | Bin 0 -> 280 bytes
 ...link-1.6-double-value-array-serializer-snapshot | Bin 0 -> 408 bytes
 .../flink-1.6-float-value-array-serializer-data    | Bin 0 -> 160 bytes
 ...flink-1.6-float-value-array-serializer-snapshot | Bin 0 -> 406 bytes
 .../flink-1.6-int-value-array-serializer-data      | Bin 0 -> 160 bytes
 .../flink-1.6-int-value-array-serializer-snapshot  | Bin 0 -> 402 bytes
 .../flink-1.6-long-value-array-serializer-data     | Bin 0 -> 280 bytes
 .../flink-1.6-long-value-array-serializer-snapshot | Bin 0 -> 404 bytes
 .../flink-1.6-null-value-array-serializer-data     | Bin 0 -> 40 bytes
 .../flink-1.6-null-value-array-serializer-snapshot | Bin 0 -> 404 bytes
 .../flink-1.6-short-value-array-serializer-data    | Bin 0 -> 100 bytes
 ...flink-1.6-short-value-array-serializer-snapshot | Bin 0 -> 406 bytes
 .../flink-1.6-string-value-array-serializer-data   | Bin 0 -> 280 bytes
 ...link-1.6-string-value-array-serializer-snapshot | Bin 0 -> 408 bytes
 .../resources/flink-1.7-array-list-serializer-data | Bin 0 -> 240 bytes
 .../flink-1.7-array-list-serializer-snapshot       | Bin 0 -> 231 bytes
 ...k-1.7-avro-generic-type-serializer-address-data | Bin 0 -> 240 bytes
 ...7-avro-generic-type-serializer-address-snapshot | Bin 0 -> 370 bytes
 .../flink-1.7-avro-type-serializer-address-data    | Bin 0 -> 240 bytes
 ...flink-1.7-avro-type-serializer-address-snapshot | Bin 0 -> 380 bytes
 .../flink-1.7-byte-value-array-serializer-data     | Bin 0 -> 70 bytes
 .../flink-1.7-byte-value-array-serializer-snapshot | Bin 0 -> 392 bytes
 .../flink-1.7-char-value-array-serializer-data     | Bin 0 -> 240 bytes
 .../flink-1.7-char-value-array-serializer-snapshot | Bin 0 -> 392 bytes
 .../flink-1.7-double-value-array-serializer-data   | Bin 0 -> 280 bytes
 ...link-1.7-double-value-array-serializer-snapshot | Bin 0 -> 396 bytes
 .../resources/flink-1.7-either-serializer-data     |   1 +
 .../resources/flink-1.7-either-serializer-snapshot | Bin 0 -> 383 bytes
 .../flink-1.7-float-value-array-serializer-data    | Bin 0 -> 160 bytes
 ...flink-1.7-float-value-array-serializer-snapshot | Bin 0 -> 394 bytes
 .../flink-1.7-int-value-array-serializer-data      | Bin 0 -> 160 bytes
 .../flink-1.7-int-value-array-serializer-snapshot  | Bin 0 -> 390 bytes
 .../flink-1.7-long-value-array-serializer-data     | Bin 0 -> 280 bytes
 .../flink-1.7-long-value-array-serializer-snapshot | Bin 0 -> 392 bytes
 .../flink-1.7-null-value-array-serializer-data     | Bin 0 -> 40 bytes
 .../flink-1.7-null-value-array-serializer-snapshot | Bin 0 -> 392 bytes
 .../flink-1.7-short-value-array-serializer-data    | Bin 0 -> 100 bytes
 ...flink-1.7-short-value-array-serializer-snapshot | Bin 0 -> 394 bytes
 .../flink-1.7-string-value-array-serializer-data   | Bin 0 -> 280 bytes
 ...link-1.7-string-value-array-serializer-snapshot | Bin 0 -> 396 bytes
 .../state/JavaSerializerSnapshotMigrationTest.java |  57 +++++++++++++
 ...dNamespacieSerializerSnapshotMigrationTest.java |  55 ++++++++++++
 .../test/resources/flink-1.6-java-serializer-data  | Bin 0 -> 180 bytes
 .../resources/flink-1.6-java-serializer-snapshot   | Bin 0 -> 366 bytes
 .../flink-1.6-void-namespace-serializer-data       | Bin 0 -> 10 bytes
 .../flink-1.6-void-namespace-serializer-snapshot   | Bin 0 -> 384 bytes
 .../test/resources/flink-1.7-java-serializer-data  | Bin 0 -> 180 bytes
 .../resources/flink-1.7-java-serializer-snapshot   | Bin 0 -> 354 bytes
 .../flink-1.7-void-namespace-serializer-data       | Bin 0 -> 10 bytes
 .../flink-1.7-void-namespace-serializer-snapshot   | Bin 0 -> 372 bytes
 .../WindowSerializerSnapshotsMigrationTest.java    |  60 +++++++++++++
 .../flink-1.6-global-window-serializer-data        | Bin 0 -> 10 bytes
 .../flink-1.6-global-window-serializer-snapshot    | Bin 0 -> 420 bytes
 .../flink-1.6-time-window-serializer-data          | Bin 0 -> 160 bytes
 .../flink-1.6-time-window-serializer-snapshot      | Bin 0 -> 416 bytes
 .../flink-1.7-global-window-serializer-data        | Bin 0 -> 10 bytes
 .../flink-1.7-global-window-serializer-snapshot    | Bin 0 -> 408 bytes
 .../flink-1.7-time-window-serializer-data          | Bin 0 -> 160 bytes
 .../flink-1.7-time-window-serializer-snapshot      | Bin 0 -> 404 bytes
 105 files changed, 525 insertions(+)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerSnapshotsMigrationTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerSnapshotsMigrationTest.java
new file mode 100644
index 0000000..85272c8
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerSnapshotsMigrationTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * Migration tests for the {@link FlinkKafkaProducer011.TransactionStateSerializer}
+ * and {@link FlinkKafkaProducer011.ContextStateSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class KafkaSerializerSnapshotsMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Object> {
+
+	public KafkaSerializerSnapshotsMigrationTest(TestSpecification<Object> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"transaction-state-serializer",
+			FlinkKafkaProducer011.TransactionStateSerializer.class,
+			FlinkKafkaProducer011.TransactionStateSerializer.TransactionStateSerializerSnapshot.class,
+			FlinkKafkaProducer011.TransactionStateSerializer::new);
+		testSpecifications.add(
+			"context-state-serializer",
+			FlinkKafkaProducer011.ContextStateSerializer.class,
+			FlinkKafkaProducer011.ContextStateSerializer.ContextStateSerializerSnapshot.class,
+			FlinkKafkaProducer011.ContextStateSerializer::new);
+
+		return testSpecifications.get();
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-context-state-serializer-data b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-context-state-serializer-data
new file mode 100644
index 0000000..e7d439a
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-context-state-serializer-data differ
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-context-state-serializer-snapshot b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-context-state-serializer-snapshot
new file mode 100644
index 0000000..f6f36e7
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-context-state-serializer-snapshot differ
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-transaction-state-serializer-data b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-transaction-state-serializer-data
new file mode 100644
index 0000000..2476af5
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-transaction-state-serializer-data differ
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-transaction-state-serializer-snapshot b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-transaction-state-serializer-snapshot
new file mode 100644
index 0000000..87182ea
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.6-transaction-state-serializer-snapshot differ
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-context-state-serializer-data b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-context-state-serializer-data
new file mode 100644
index 0000000..e7d439a
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-context-state-serializer-data differ
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-context-state-serializer-snapshot b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-context-state-serializer-snapshot
new file mode 100644
index 0000000..870f4b4
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-context-state-serializer-snapshot differ
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-transaction-state-serializer-data b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-transaction-state-serializer-data
new file mode 100644
index 0000000..2476af5
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-transaction-state-serializer-data differ
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-transaction-state-serializer-snapshot b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-transaction-state-serializer-snapshot
new file mode 100644
index 0000000..52370eb
Binary files /dev/null and b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/flink-1.7-transaction-state-serializer-snapshot differ
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerSnapshotsMigrationTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerSnapshotsMigrationTest.java
new file mode 100644
index 0000000..ac69205
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerSnapshotsMigrationTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * Migration tests for the {@link FlinkKafkaProducer.TransactionStateSerializer}
+ * and {@link FlinkKafkaProducer.ContextStateSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class KafkaSerializerSnapshotsMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Object> {
+
+	public KafkaSerializerSnapshotsMigrationTest(TestSpecification<Object> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"transaction-state-serializer",
+			FlinkKafkaProducer.TransactionStateSerializer.class,
+			FlinkKafkaProducer.TransactionStateSerializer.TransactionStateSerializerSnapshot.class,
+			FlinkKafkaProducer.TransactionStateSerializer::new);
+		testSpecifications.add(
+			"context-state-serializer",
+			FlinkKafkaProducer.ContextStateSerializer.class,
+			FlinkKafkaProducer.ContextStateSerializer.ContextStateSerializerSnapshot.class,
+			FlinkKafkaProducer.ContextStateSerializer::new);
+
+		return testSpecifications.get();
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-context-state-serializer-data b/flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-context-state-serializer-data
new file mode 100644
index 0000000..e7d439a
Binary files /dev/null and b/flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-context-state-serializer-data differ
diff --git a/flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-context-state-serializer-snapshot b/flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-context-state-serializer-snapshot
new file mode 100644
index 0000000..14f0dd4
Binary files /dev/null and b/flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-context-state-serializer-snapshot differ
diff --git a/flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-transaction-state-serializer-data b/flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-transaction-state-serializer-data
new file mode 100644
index 0000000..2476af5
Binary files /dev/null and b/flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-transaction-state-serializer-data differ
diff --git a/flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-transaction-state-serializer-snapshot b/flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-transaction-state-serializer-snapshot
new file mode 100644
index 0000000..35063ba
Binary files /dev/null and b/flink-connectors/flink-connector-kafka/src/test/resources/flink-1.7-transaction-state-serializer-snapshot differ
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerSnapshotsMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerSnapshotsMigrationTest.java
new file mode 100644
index 0000000..0d8d999
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerSnapshotsMigrationTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNode;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * Migration tests for NFA-related serializers.
+ */
+@RunWith(Parameterized.class)
+public class NFASerializerSnapshotsMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Object> {
+
+	public NFASerializerSnapshotsMigrationTest(TestSpecification<Object> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"event-id-serializer",
+			EventId.EventIdSerializer.class,
+			EventId.EventIdSerializer.EventIdSerializerSnapshot.class,
+			() -> EventId.EventIdSerializer.INSTANCE);
+		testSpecifications.add(
+			"node-id-serializer",
+			NodeId.NodeIdSerializer.class,
+			NodeId.NodeIdSerializer.NodeIdSerializerSnapshot.class,
+			NodeId.NodeIdSerializer::new);
+		testSpecifications.add(
+			"dewey-number-serializer",
+			DeweyNumber.DeweyNumberSerializer.class,
+			DeweyNumber.DeweyNumberSerializer.DeweyNumberSerializerSnapshot.class,
+			() -> DeweyNumber.DeweyNumberSerializer.INSTANCE);
+		testSpecifications.add(
+			"shared-buffer-edge-serializer",
+			SharedBufferEdge.SharedBufferEdgeSerializer.class,
+			SharedBufferEdge.SharedBufferEdgeSerializer.SharedBufferEdgeSerializerSnapshot.class,
+			SharedBufferEdge.SharedBufferEdgeSerializer::new);
+		testSpecifications.add(
+			"shared-buffer-node-serializer",
+			SharedBufferNode.SharedBufferNodeSerializer.class,
+			SharedBufferNode.SharedBufferNodeSerializer.SharedBufferNodeSerializerSnapshot.class,
+			SharedBufferNode.SharedBufferNodeSerializer::new);
+
+		return testSpecifications.get();
+	}
+}
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.6-dewey-number-serializer-data b/flink-libraries/flink-cep/src/test/resources/flink-1.6-dewey-number-serializer-data
new file mode 100644
index 0000000..75d3d21
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.6-dewey-number-serializer-data differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.6-dewey-number-serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/flink-1.6-dewey-number-serializer-snapshot
new file mode 100644
index 0000000..027167e
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.6-dewey-number-serializer-snapshot differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.6-event-id-serializer-data b/flink-libraries/flink-cep/src/test/resources/flink-1.6-event-id-serializer-data
new file mode 100644
index 0000000..840f83d
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.6-event-id-serializer-data differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.6-event-id-serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/flink-1.6-event-id-serializer-snapshot
new file mode 100644
index 0000000..0d1a700
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.6-event-id-serializer-snapshot differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.6-node-id-serializer-data b/flink-libraries/flink-cep/src/test/resources/flink-1.6-node-id-serializer-data
new file mode 100644
index 0000000..7a4f46e
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.6-node-id-serializer-data differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.6-node-id-serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/flink-1.6-node-id-serializer-snapshot
new file mode 100644
index 0000000..4f9053e
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.6-node-id-serializer-snapshot differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-edge-serializer-data b/flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-edge-serializer-data
new file mode 100644
index 0000000..5979627
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-edge-serializer-data differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-edge-serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-edge-serializer-snapshot
new file mode 100644
index 0000000..af0016b
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-edge-serializer-snapshot differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-node-serializer-data b/flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-node-serializer-data
new file mode 100644
index 0000000..8fe40f6
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-node-serializer-data differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-node-serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-node-serializer-snapshot
new file mode 100644
index 0000000..3071b83
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.6-shared-buffer-node-serializer-snapshot differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.7-dewey-number-serializer-data b/flink-libraries/flink-cep/src/test/resources/flink-1.7-dewey-number-serializer-data
new file mode 100644
index 0000000..75d3d21
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.7-dewey-number-serializer-data differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.7-dewey-number-serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/flink-1.7-dewey-number-serializer-snapshot
new file mode 100644
index 0000000..3dae669
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.7-dewey-number-serializer-snapshot differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.7-event-id-serializer-data b/flink-libraries/flink-cep/src/test/resources/flink-1.7-event-id-serializer-data
new file mode 100644
index 0000000..5e4f6bf
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.7-event-id-serializer-data differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.7-event-id-serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/flink-1.7-event-id-serializer-snapshot
new file mode 100644
index 0000000..c824699
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.7-event-id-serializer-snapshot differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.7-node-id-serializer-data b/flink-libraries/flink-cep/src/test/resources/flink-1.7-node-id-serializer-data
new file mode 100644
index 0000000..ed21712
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.7-node-id-serializer-data differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.7-node-id-serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/flink-1.7-node-id-serializer-snapshot
new file mode 100644
index 0000000..ba4c1c7
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.7-node-id-serializer-snapshot differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-edge-serializer-data b/flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-edge-serializer-data
new file mode 100644
index 0000000..3546983
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-edge-serializer-data differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-edge-serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-edge-serializer-snapshot
new file mode 100644
index 0000000..db33009
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-edge-serializer-snapshot differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-node-serializer-data b/flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-node-serializer-data
new file mode 100644
index 0000000..fb81606
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-node-serializer-data differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-node-serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-node-serializer-snapshot
new file mode 100644
index 0000000..fab3567
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.7-shared-buffer-node-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml
index a3e9661..d7253ac 100644
--- a/flink-libraries/flink-gelly-examples/pom.xml
+++ b/flink-libraries/flink-gelly-examples/pom.xml
@@ -110,6 +110,14 @@
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
 	</dependencies>
 
 	<build>
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCodeSerializerSnapshotMigrationTest.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCodeSerializerSnapshotMigrationTest.java
new file mode 100644
index 0000000..d996f22
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCodeSerializerSnapshotMigrationTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.transform;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * Migration tests for {@link LongValueWithProperHashCode.LongValueWithProperHashCodeSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class LongValueWithProperHashCodeSerializerSnapshotMigrationTest
+		extends TypeSerializerSnapshotMigrationTestBase<LongValueWithProperHashCode> {
+
+	public LongValueWithProperHashCodeSerializerSnapshotMigrationTest(
+			TestSpecification<LongValueWithProperHashCode> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"long-value-with-proper-hash-code-serializer",
+			LongValueWithProperHashCode.LongValueWithProperHashCodeSerializer.class,
+			LongValueWithProperHashCode.LongValueWithProperHashCodeSerializer.LongValueWithProperHashCodeSerializerSnapshot.class,
+			LongValueWithProperHashCode.LongValueWithProperHashCodeSerializer::new);
+
+		return testSpecifications.get();
+	}
+}
diff --git a/flink-libraries/flink-gelly-examples/src/test/resources/flink-1.6-long-value-with-proper-hash-code-serializer-data b/flink-libraries/flink-gelly-examples/src/test/resources/flink-1.6-long-value-with-proper-hash-code-serializer-data
new file mode 100644
index 0000000..f3e06a4
Binary files /dev/null and b/flink-libraries/flink-gelly-examples/src/test/resources/flink-1.6-long-value-with-proper-hash-code-serializer-data differ
diff --git a/flink-libraries/flink-gelly-examples/src/test/resources/flink-1.6-long-value-with-proper-hash-code-serializer-snapshot b/flink-libraries/flink-gelly-examples/src/test/resources/flink-1.6-long-value-with-proper-hash-code-serializer-snapshot
new file mode 100644
index 0000000..5aaf88b
Binary files /dev/null and b/flink-libraries/flink-gelly-examples/src/test/resources/flink-1.6-long-value-with-proper-hash-code-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly-examples/src/test/resources/flink-1.7-long-value-with-proper-hash-code-serializer-data b/flink-libraries/flink-gelly-examples/src/test/resources/flink-1.7-long-value-with-proper-hash-code-serializer-data
new file mode 100644
index 0000000..f3e06a4
Binary files /dev/null and b/flink-libraries/flink-gelly-examples/src/test/resources/flink-1.7-long-value-with-proper-hash-code-serializer-data differ
diff --git a/flink-libraries/flink-gelly-examples/src/test/resources/flink-1.7-long-value-with-proper-hash-code-serializer-snapshot b/flink-libraries/flink-gelly-examples/src/test/resources/flink-1.7-long-value-with-proper-hash-code-serializer-snapshot
new file mode 100644
index 0000000..28104ca
Binary files /dev/null and b/flink-libraries/flink-gelly-examples/src/test/resources/flink-1.7-long-value-with-proper-hash-code-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ValueArraySerializerSnapshotMigrationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ValueArraySerializerSnapshotMigrationTest.java
new file mode 100644
index 0000000..e9f0be2
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/types/valuearray/ValueArraySerializerSnapshotMigrationTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.types.valuearray;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * Migration tests for boxed-value array serializer snapshots.
+ */
+@RunWith(Parameterized.class)
+public class ValueArraySerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Object> {
+
+	public ValueArraySerializerSnapshotMigrationTest(TestSpecification<Object> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"byte-value-array-serializer",
+			ByteValueArraySerializer.class,
+			ByteValueArraySerializer.ByteValueArraySerializerSnapshot.class,
+			ByteValueArraySerializer::new);
+		testSpecifications.add(
+			"char-value-array-serializer",
+			CharValueArraySerializer.class,
+			CharValueArraySerializer.CharValueArraySerializerSnapshot.class,
+			CharValueArraySerializer::new);
+		testSpecifications.add(
+			"double-value-array-serializer",
+			DoubleValueArraySerializer.class,
+			DoubleValueArraySerializer.DoubleValueArraySerializerSnapshot.class,
+			DoubleValueArraySerializer::new);
+		testSpecifications.add(
+			"float-value-array-serializer",
+			FloatValueArraySerializer.class,
+			FloatValueArraySerializer.FloatValueArraySerializerSnapshot.class,
+			FloatValueArraySerializer::new);
+		testSpecifications.add(
+			"int-value-array-serializer",
+			IntValueArraySerializer.class,
+			IntValueArraySerializer.IntValueArraySerializerSnapshot.class,
+			IntValueArraySerializer::new);
+		testSpecifications.add(
+			"long-value-array-serializer",
+			LongValueArraySerializer.class,
+			LongValueArraySerializer.LongValueArraySerializerSnapshot.class,
+			LongValueArraySerializer::new);
+		testSpecifications.add(
+			"null-value-array-serializer",
+			NullValueArraySerializer.class,
+			NullValueArraySerializer.NullValueArraySerializerSnapshot.class,
+			NullValueArraySerializer::new);
+		testSpecifications.add(
+			"short-value-array-serializer",
+			ShortValueArraySerializer.class,
+			ShortValueArraySerializer.ShortValueArraySerializerSnapshot.class,
+			ShortValueArraySerializer::new);
+		testSpecifications.add(
+			"string-value-array-serializer",
+			StringValueArraySerializer.class,
+			StringValueArraySerializer.StringValueArraySerializerSnapshot.class,
+			StringValueArraySerializer::new);
+
+		return testSpecifications.get();
+	}
+}
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-byte-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-byte-value-array-serializer-data
new file mode 100644
index 0000000..a03b9c2
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-byte-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-byte-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-byte-value-array-serializer-snapshot
new file mode 100644
index 0000000..f23bb4d
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-byte-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-char-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-char-value-array-serializer-data
new file mode 100644
index 0000000..918413b
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-char-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-char-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-char-value-array-serializer-snapshot
new file mode 100644
index 0000000..efec125
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-char-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-double-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-double-value-array-serializer-data
new file mode 100644
index 0000000..a144299
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-double-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-double-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-double-value-array-serializer-snapshot
new file mode 100644
index 0000000..fce23b9
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-double-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-float-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-float-value-array-serializer-data
new file mode 100644
index 0000000..dc10ffb
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-float-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-float-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-float-value-array-serializer-snapshot
new file mode 100644
index 0000000..a2cc5d9
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-float-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-int-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-int-value-array-serializer-data
new file mode 100644
index 0000000..6966366
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-int-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-int-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-int-value-array-serializer-snapshot
new file mode 100644
index 0000000..08dd772
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-int-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-long-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-long-value-array-serializer-data
new file mode 100644
index 0000000..b6f6e75
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-long-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-long-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-long-value-array-serializer-snapshot
new file mode 100644
index 0000000..4650904
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-long-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-null-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-null-value-array-serializer-data
new file mode 100644
index 0000000..806e8ab
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-null-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-null-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-null-value-array-serializer-snapshot
new file mode 100644
index 0000000..15bac11
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-null-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-short-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-short-value-array-serializer-data
new file mode 100644
index 0000000..b51a799
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-short-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-short-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-short-value-array-serializer-snapshot
new file mode 100644
index 0000000..04f4fbb
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-short-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-string-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-string-value-array-serializer-data
new file mode 100644
index 0000000..0dd9644
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-string-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.6-string-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-string-value-array-serializer-snapshot
new file mode 100644
index 0000000..fba2b16
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.6-string-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-array-list-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-array-list-serializer-data
new file mode 100644
index 0000000..7b6c68a
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-array-list-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-array-list-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-array-list-serializer-snapshot
new file mode 100644
index 0000000..cf43209
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-array-list-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-avro-generic-type-serializer-address-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-avro-generic-type-serializer-address-data
new file mode 100644
index 0000000..74acf72
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-avro-generic-type-serializer-address-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-avro-generic-type-serializer-address-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-avro-generic-type-serializer-address-snapshot
new file mode 100644
index 0000000..f27d2dc
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-avro-generic-type-serializer-address-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-avro-type-serializer-address-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-avro-type-serializer-address-data
new file mode 100644
index 0000000..74acf72
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-avro-type-serializer-address-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-avro-type-serializer-address-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-avro-type-serializer-address-snapshot
new file mode 100644
index 0000000..7c8f6c2
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-avro-type-serializer-address-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-byte-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-byte-value-array-serializer-data
new file mode 100644
index 0000000..a03b9c2
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-byte-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-byte-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-byte-value-array-serializer-snapshot
new file mode 100644
index 0000000..3489a4f
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-byte-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-char-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-char-value-array-serializer-data
new file mode 100644
index 0000000..918413b
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-char-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-char-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-char-value-array-serializer-snapshot
new file mode 100644
index 0000000..80987e5
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-char-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-double-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-double-value-array-serializer-data
new file mode 100644
index 0000000..a144299
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-double-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-double-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-double-value-array-serializer-snapshot
new file mode 100644
index 0000000..8af7a6e
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-double-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-either-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-either-serializer-data
new file mode 100644
index 0000000..3aabc0b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-either-serializer-data
@@ -0,0 +1 @@
+hello worldhello worldhello worldhello worldhello worldhello worldhello worldhello worldhello worldhello world
\ No newline at end of file
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-either-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-either-serializer-snapshot
new file mode 100644
index 0000000..16242c0
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-either-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-float-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-float-value-array-serializer-data
new file mode 100644
index 0000000..dc10ffb
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-float-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-float-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-float-value-array-serializer-snapshot
new file mode 100644
index 0000000..b981c5d
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-float-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-int-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-int-value-array-serializer-data
new file mode 100644
index 0000000..6966366
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-int-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-int-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-int-value-array-serializer-snapshot
new file mode 100644
index 0000000..1842963
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-int-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-long-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-long-value-array-serializer-data
new file mode 100644
index 0000000..b6f6e75
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-long-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-long-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-long-value-array-serializer-snapshot
new file mode 100644
index 0000000..c24e00a
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-long-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-null-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-null-value-array-serializer-data
new file mode 100644
index 0000000..806e8ab
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-null-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-null-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-null-value-array-serializer-snapshot
new file mode 100644
index 0000000..9b8db64
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-null-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-short-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-short-value-array-serializer-data
new file mode 100644
index 0000000..b51a799
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-short-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-short-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-short-value-array-serializer-snapshot
new file mode 100644
index 0000000..32de4d4
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-short-value-array-serializer-snapshot differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-string-value-array-serializer-data b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-string-value-array-serializer-data
new file mode 100644
index 0000000..0dd9644
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-string-value-array-serializer-data differ
diff --git a/flink-libraries/flink-gelly/src/test/resources/flink-1.7-string-value-array-serializer-snapshot b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-string-value-array-serializer-snapshot
new file mode 100644
index 0000000..af2ae9d
Binary files /dev/null and b/flink-libraries/flink-gelly/src/test/resources/flink-1.7-string-value-array-serializer-snapshot differ
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/JavaSerializerSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/JavaSerializerSnapshotMigrationTest.java
new file mode 100644
index 0000000..bcf18f5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/JavaSerializerSnapshotMigrationTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * Migration test for the {@link JavaSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class JavaSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Serializable> {
+
+	private static final String SPEC_NAME = "java-serializer";
+
+	public JavaSerializerSnapshotMigrationTest(TestSpecification<Serializable> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			SPEC_NAME,
+			JavaSerializer.class,
+			JavaSerializer.JavaSerializerSnapshot.class,
+			() -> new JavaSerializer<>());
+
+		return testSpecifications.get();
+	}
+}
+
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/VoidNamespacieSerializerSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/VoidNamespacieSerializerSnapshotMigrationTest.java
new file mode 100644
index 0000000..ac5d69e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/VoidNamespacieSerializerSnapshotMigrationTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * Migration test for the {@link VoidNamespaceSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class VoidNamespacieSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<VoidNamespace> {
+
+	private static final String SPEC_NAME = "void-namespace-serializer";
+
+	public VoidNamespacieSerializerSnapshotMigrationTest(TestSpecification<VoidNamespace> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			SPEC_NAME,
+			VoidNamespaceSerializer.class,
+			VoidNamespaceSerializer.VoidNamespaceSerializerSnapshot.class,
+			() -> VoidNamespaceSerializer.INSTANCE);
+
+		return testSpecifications.get();
+	}
+}
diff --git a/flink-runtime/src/test/resources/flink-1.6-java-serializer-data b/flink-runtime/src/test/resources/flink-1.6-java-serializer-data
new file mode 100644
index 0000000..7579df0
Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.6-java-serializer-data differ
diff --git a/flink-runtime/src/test/resources/flink-1.6-java-serializer-snapshot b/flink-runtime/src/test/resources/flink-1.6-java-serializer-snapshot
new file mode 100644
index 0000000..cec8f75
Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.6-java-serializer-snapshot differ
diff --git a/flink-runtime/src/test/resources/flink-1.6-void-namespace-serializer-data b/flink-runtime/src/test/resources/flink-1.6-void-namespace-serializer-data
new file mode 100644
index 0000000..cb43b5c
Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.6-void-namespace-serializer-data differ
diff --git a/flink-runtime/src/test/resources/flink-1.6-void-namespace-serializer-snapshot b/flink-runtime/src/test/resources/flink-1.6-void-namespace-serializer-snapshot
new file mode 100644
index 0000000..be8d4f4
Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.6-void-namespace-serializer-snapshot differ
diff --git a/flink-runtime/src/test/resources/flink-1.7-java-serializer-data b/flink-runtime/src/test/resources/flink-1.7-java-serializer-data
new file mode 100644
index 0000000..7579df0
Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.7-java-serializer-data differ
diff --git a/flink-runtime/src/test/resources/flink-1.7-java-serializer-snapshot b/flink-runtime/src/test/resources/flink-1.7-java-serializer-snapshot
new file mode 100644
index 0000000..0bb9093
Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.7-java-serializer-snapshot differ
diff --git a/flink-runtime/src/test/resources/flink-1.7-void-namespace-serializer-data b/flink-runtime/src/test/resources/flink-1.7-void-namespace-serializer-data
new file mode 100644
index 0000000..cb43b5c
Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.7-void-namespace-serializer-data differ
diff --git a/flink-runtime/src/test/resources/flink-1.7-void-namespace-serializer-snapshot b/flink-runtime/src/test/resources/flink-1.7-void-namespace-serializer-snapshot
new file mode 100644
index 0000000..a12698d
Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.7-void-namespace-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowSerializerSnapshotsMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowSerializerSnapshotsMigrationTest.java
new file mode 100644
index 0000000..94dd4fb
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowSerializerSnapshotsMigrationTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.testutils.migration.MigrationVersion;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Collection;
+
+/**
+ * Migration tests for the {@link TimeWindow.Serializer} and {@link GlobalWindow.Serializer}.
+ */
+@RunWith(Parameterized.class)
+public class WindowSerializerSnapshotsMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Object> {
+
+	public WindowSerializerSnapshotsMigrationTest(TestSpecification<Object> testSpecification) {
+		super(testSpecification);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Parameterized.Parameters(name = "Test Specification = {0}")
+	public static Collection<TestSpecification<?>> testSpecifications() {
+
+		final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);
+
+		testSpecifications.add(
+			"time-window-serializer",
+			TimeWindow.Serializer.class,
+			TimeWindow.Serializer.TimeWindowSerializerSnapshot.class,
+			TimeWindow.Serializer::new);
+		testSpecifications.add(
+			"global-window-serializer",
+			GlobalWindow.Serializer.class,
+			GlobalWindow.Serializer.GlobalWindowSerializerSnapshot.class,
+			GlobalWindow.Serializer::new);
+
+		return testSpecifications.get();
+	}
+}
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-global-window-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-global-window-serializer-data
new file mode 100644
index 0000000..cb43b5c
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-global-window-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-global-window-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-global-window-serializer-snapshot
new file mode 100644
index 0000000..bb83337
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-global-window-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-time-window-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-time-window-serializer-data
new file mode 100644
index 0000000..5ee28f7
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-time-window-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.6-time-window-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-time-window-serializer-snapshot
new file mode 100644
index 0000000..2f2f17b
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-time-window-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-global-window-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-global-window-serializer-data
new file mode 100644
index 0000000..cb43b5c
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-global-window-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-global-window-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-global-window-serializer-snapshot
new file mode 100644
index 0000000..82a2fc0
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-global-window-serializer-snapshot differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-time-window-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-time-window-serializer-data
new file mode 100644
index 0000000..2d241e0
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-time-window-serializer-data differ
diff --git a/flink-streaming-java/src/test/resources/flink-1.7-time-window-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-time-window-serializer-snapshot
new file mode 100644
index 0000000..bed39b8
Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-time-window-serializer-snapshot differ


[flink] 01/05: [FLINK-11328] [core] Upgrade parameterless / singleton serializers to use new serialization compatibility APIs

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit edf6d59d315fc0f88d35d6686b39891864537620
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Jan 14 16:46:48 2019 +0100

    [FLINK-11328] [core] Upgrade parameterless / singleton serializers to use new serialization compatibility APIs
---
 .../connectors/kafka/FlinkKafkaProducer011.java    | 36 +++++++++
 .../connectors/kafka/FlinkKafkaProducer.java       | 38 ++++++++++
 .../ParameterlessTypeSerializerConfig.java         |  4 +
 .../typeutils/SimpleTypeSerializerSnapshot.java    | 88 +++++-----------------
 .../common/typeutils/base/BigDecSerializer.java    |  3 +-
 .../common/typeutils/base/BigIntSerializer.java    |  3 +-
 .../common/typeutils/base/BooleanSerializer.java   |  3 +-
 .../typeutils/base/BooleanValueSerializer.java     |  3 +-
 .../api/common/typeutils/base/ByteSerializer.java  |  3 +-
 .../common/typeutils/base/ByteValueSerializer.java |  3 +-
 .../api/common/typeutils/base/CharSerializer.java  |  3 +-
 .../common/typeutils/base/CharValueSerializer.java |  3 +-
 .../api/common/typeutils/base/DateSerializer.java  |  3 +-
 .../common/typeutils/base/DoubleSerializer.java    |  3 +-
 .../typeutils/base/DoubleValueSerializer.java      |  3 +-
 .../api/common/typeutils/base/FloatSerializer.java |  3 +-
 .../typeutils/base/FloatValueSerializer.java       |  3 +-
 .../common/typeutils/base/InstantSerializer.java   |  3 +-
 .../api/common/typeutils/base/IntSerializer.java   |  3 +-
 .../common/typeutils/base/IntValueSerializer.java  |  3 +-
 .../api/common/typeutils/base/LongSerializer.java  |  3 +-
 .../common/typeutils/base/LongValueSerializer.java |  3 +-
 .../common/typeutils/base/NullValueSerializer.java |  3 +-
 .../api/common/typeutils/base/ShortSerializer.java |  3 +-
 .../typeutils/base/ShortValueSerializer.java       |  3 +-
 .../common/typeutils/base/SqlDateSerializer.java   |  3 +-
 .../common/typeutils/base/SqlTimeSerializer.java   |  3 +-
 .../typeutils/base/SqlTimestampSerializer.java     |  3 +-
 .../common/typeutils/base/StringSerializer.java    |  3 +-
 .../typeutils/base/StringValueSerializer.java      |  3 +-
 .../typeutils/base/TypeSerializerSingleton.java    | 12 ++-
 .../api/common/typeutils/base/VoidSerializer.java  |  3 +-
 .../array/BooleanPrimitiveArraySerializer.java     |  3 +-
 .../base/array/BytePrimitiveArraySerializer.java   |  3 +-
 .../base/array/CharPrimitiveArraySerializer.java   |  3 +-
 .../base/array/DoublePrimitiveArraySerializer.java |  3 +-
 .../base/array/FloatPrimitiveArraySerializer.java  |  3 +-
 .../base/array/IntPrimitiveArraySerializer.java    |  3 +-
 .../base/array/LongPrimitiveArraySerializer.java   |  3 +-
 .../base/array/ShortPrimitiveArraySerializer.java  |  3 +-
 .../base/array/StringArraySerializer.java          |  3 +-
 .../java/org/apache/flink/cep/nfa/DeweyNumber.java | 20 +++++
 .../apache/flink/cep/nfa/NFAStateSerializer.java   | 19 +++++
 .../apache/flink/cep/nfa/sharedbuffer/EventId.java | 20 +++++
 .../apache/flink/cep/nfa/sharedbuffer/NodeId.java  | 19 +++++
 .../cep/nfa/sharedbuffer/SharedBufferEdge.java     | 20 +++++
 .../cep/nfa/sharedbuffer/SharedBufferNode.java     | 20 +++++
 .../transform/LongValueWithProperHashCode.java     | 21 +++++-
 .../types/valuearray/ByteValueArraySerializer.java | 20 ++++-
 .../types/valuearray/CharValueArraySerializer.java | 21 +++++-
 .../valuearray/DoubleValueArraySerializer.java     | 21 +++++-
 .../valuearray/FloatValueArraySerializer.java      | 21 +++++-
 .../types/valuearray/IntValueArraySerializer.java  | 21 +++++-
 .../types/valuearray/LongValueArraySerializer.java | 21 +++++-
 .../types/valuearray/NullValueArraySerializer.java | 20 +++++
 .../valuearray/ShortValueArraySerializer.java      | 21 +++++-
 .../valuearray/StringValueArraySerializer.java     | 21 +++++-
 .../client/VoidNamespaceSerializer.java            | 20 +++++
 .../apache/flink/runtime/state/JavaSerializer.java | 20 +++++
 .../runtime/state/VoidNamespaceSerializer.java     | 20 +++++
 .../flink/api/scala/typeutils/UnitSerializer.scala | 27 +++++++
 .../api/windowing/windows/GlobalWindow.java        | 20 +++++
 .../api/windowing/windows/TimeWindow.java          | 20 +++++
 .../jar/CheckpointingCustomKvStateProgram.java     | 20 +++++
 64 files changed, 610 insertions(+), 149 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 3e7cf2b..129ed66 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
@@ -1230,6 +1232,23 @@ public class FlinkKafkaProducer011<IN>
 		public boolean canEqual(Object obj) {
 			return obj instanceof TransactionStateSerializer;
 		}
+
+		// ------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<KafkaTransactionState> snapshotConfiguration() {
+			return new TransactionStateSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class TransactionStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionState> {
+			public TransactionStateSerializerSnapshot() {
+				super(TransactionStateSerializer::new);
+			}
+		}
 	}
 
 	/**
@@ -1312,6 +1331,23 @@ public class FlinkKafkaProducer011<IN>
 		public boolean canEqual(Object obj) {
 			return obj instanceof ContextStateSerializer;
 		}
+
+		// ------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<KafkaTransactionContext> snapshotConfiguration() {
+			return new ContextStateSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class ContextStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionContext> {
+			public ContextStateSerializerSnapshot() {
+				super(ContextStateSerializer::new);
+			}
+		}
 	}
 
 	/**
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 10e8ef1..15b5b9f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -26,6 +26,8 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
@@ -1236,6 +1238,24 @@ public class FlinkKafkaProducer<IN>
 		public boolean canEqual(Object obj) {
 			return obj instanceof FlinkKafkaProducer.TransactionStateSerializer;
 		}
+
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<FlinkKafkaProducer.KafkaTransactionState> snapshotConfiguration() {
+			return new TransactionStateSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class TransactionStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<FlinkKafkaProducer.KafkaTransactionState> {
+
+			public TransactionStateSerializerSnapshot() {
+				super(TransactionStateSerializer::new);
+			}
+		}
 	}
 
 	/**
@@ -1318,6 +1338,24 @@ public class FlinkKafkaProducer<IN>
 		public boolean canEqual(Object obj) {
 			return obj instanceof FlinkKafkaProducer.ContextStateSerializer;
 		}
+
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<KafkaTransactionContext> snapshotConfiguration() {
+			return new ContextStateSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class ContextStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionContext> {
+
+			public ContextStateSerializerSnapshot() {
+				super(ContextStateSerializer::new);
+			}
+		}
 	}
 
 	/**
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
index 6fc6d17..29da90a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
@@ -27,8 +27,12 @@ import java.io.IOException;
 
 /**
  * A base class for {@link TypeSerializerConfigSnapshot}s that do not have any parameters.
+ *
+ * @deprecated this snapshot class is no longer used by any serializers, and is maintained only
+ *             for backward compatibility reasons. It is fully replaced by {@link SimpleTypeSerializerSnapshot}.
  */
 @Internal
+@Deprecated
 public final class ParameterlessTypeSerializerConfig<T> extends TypeSerializerConfigSnapshot<T> {
 
 	private static final int VERSION = 1;
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SimpleTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SimpleTypeSerializerSnapshot.java
index 03fc1cb..dca34e9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SimpleTypeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/SimpleTypeSerializerSnapshot.java
@@ -21,15 +21,13 @@ package org.apache.flink.api.common.typeutils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.function.Supplier;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A simple base class for TypeSerializerSnapshots, for serializers that have no
@@ -46,24 +44,18 @@ public abstract class SimpleTypeSerializerSnapshot<T> implements TypeSerializerS
 	 * backwards compatible code paths in case we decide to make this snapshot backwards compatible with
 	 * the {@link ParameterlessTypeSerializerConfig}.
 	 */
-	private static final int CURRENT_VERSION = 2;
+	private static final int CURRENT_VERSION = 3;
 
 	/** The class of the serializer for this snapshot.
 	 * The field is null if the serializer was created for read and has not been read, yet. */
-	@Nullable
-	private Class<? extends TypeSerializer<T>> serializerClass;
-
-	/**
-	 * Default constructor for instantiation on restore (reading the snapshot).
-	 */
-	@SuppressWarnings("unused")
-	public SimpleTypeSerializerSnapshot() {}
+	@Nonnull
+	private Supplier<? extends TypeSerializer<T>> serializerSupplier;
 
 	/**
 	 * Constructor to create snapshot from serializer (writing the snapshot).
 	 */
-	public SimpleTypeSerializerSnapshot(@Nonnull Class<? extends TypeSerializer<T>> serializerClass) {
-		this.serializerClass = checkNotNull(serializerClass);
+	public SimpleTypeSerializerSnapshot(@Nonnull Supplier<? extends TypeSerializer<T>> serializerSupplier) {
+		this.serializerSupplier = checkNotNull(serializerSupplier);
 	}
 
 	// ------------------------------------------------------------------------
@@ -77,42 +69,39 @@ public abstract class SimpleTypeSerializerSnapshot<T> implements TypeSerializerS
 
 	@Override
 	public TypeSerializer<T> restoreSerializer() {
-		checkState(serializerClass != null);
-		return InstantiationUtil.instantiate(serializerClass);
+		return serializerSupplier.get();
 	}
 
 	@Override
 	public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
 
-		checkState(serializerClass != null);
-		return newSerializer.getClass() == serializerClass ?
+		return newSerializer.getClass() == serializerSupplier.get().getClass() ?
 				TypeSerializerSchemaCompatibility.compatibleAsIs() :
 				TypeSerializerSchemaCompatibility.incompatible();
 	}
 
 	@Override
 	public void writeSnapshot(DataOutputView out) throws IOException {
-		checkState(serializerClass != null);
-		out.writeUTF(serializerClass.getName());
+		//
 	}
 
 	@Override
 	public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader) throws IOException {
 		switch (readVersion) {
-			case 2:
-				read(in, classLoader);
+			case 3: {
 				break;
-			default:
+			}
+			case 2: {
+				// we don't need the classname any more; read and drop to maintain compatibility
+				in.readUTF();
+				break;
+			}
+			default: {
 				throw new IOException("Unrecognized version: " + readVersion);
+			}
 		}
 	}
 
-	private void read(DataInputView in, ClassLoader classLoader) throws IOException {
-		final String className = in.readUTF();
-		final Class<?> clazz = resolveClassName(className, classLoader, false);
-		this.serializerClass = cast(clazz);
-	}
-
 	// ------------------------------------------------------------------------
 	//  standard utilities
 	// ------------------------------------------------------------------------
@@ -131,45 +120,4 @@ public abstract class SimpleTypeSerializerSnapshot<T> implements TypeSerializerS
 	public String toString() {
 		return getClass().getName();
 	}
-
-	// ------------------------------------------------------------------------
-	//  utilities
-	// ------------------------------------------------------------------------
-
-	private static Class<?> resolveClassName(String className, ClassLoader cl, boolean allowCanonicalName) throws IOException {
-		try {
-			return Class.forName(className, false, cl);
-		}
-		catch (ClassNotFoundException e) {
-			if (allowCanonicalName) {
-				try {
-					return Class.forName(guessClassNameFromCanonical(className), false, cl);
-				}
-				catch (ClassNotFoundException ignored) {}
-			}
-
-			// throw with original ClassNotFoundException
-			throw new IOException(
-						"Failed to read SimpleTypeSerializerSnapshot: Serializer class not found: " + className, e);
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	private static <T> Class<? extends TypeSerializer<T>> cast(Class<?> clazz) throws IOException {
-		if (!TypeSerializer.class.isAssignableFrom(clazz)) {
-			throw new IOException("Failed to read SimpleTypeSerializerSnapshot. " +
-					"Serializer class name leads to a class that is not a TypeSerializer: " + clazz.getName());
-		}
-
-		return (Class<? extends TypeSerializer<T>>) clazz;
-	}
-
-	static String guessClassNameFromCanonical(String className) {
-		int lastDot = className.lastIndexOf('.');
-		if (lastDot > 0 && lastDot < className.length() - 1) {
-			return className.substring(0, lastDot) + '$' + className.substring(lastDot + 1);
-		} else {
-			return className;
-		}
-	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
index ed5a3a0..7e4e97f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
@@ -151,10 +151,11 @@ public final class BigDecSerializer extends TypeSerializerSingleton<BigDecimal>
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class BigDecSerializerSnapshot extends SimpleTypeSerializerSnapshot<BigDecimal> {
 
 		public BigDecSerializerSnapshot() {
-			super(BigDecSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
index b4c3a19..ef4358e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
@@ -156,10 +156,11 @@ public final class BigIntSerializer extends TypeSerializerSingleton<BigInteger>
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class BigIntSerializerSnapshot extends SimpleTypeSerializerSnapshot<BigInteger> {
 
 		public BigIntSerializerSnapshot() {
-			super(BigIntSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
index 21c558b..84056e5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
@@ -99,10 +99,11 @@ public final class BooleanSerializer extends TypeSerializerSingleton<Boolean> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class BooleanSerializerSnapshot extends SimpleTypeSerializerSnapshot<Boolean> {
 
 		public BooleanSerializerSnapshot() {
-			super(BooleanSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
index e66fe06..3e497ce 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
@@ -98,10 +98,11 @@ public final class BooleanValueSerializer extends TypeSerializerSingleton<Boolea
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class BooleanValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<BooleanValue> {
 
 		public BooleanValueSerializerSnapshot() {
-			super(BooleanValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
index ab9061f..5fdd9b6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
@@ -99,10 +99,11 @@ public final class ByteSerializer extends TypeSerializerSingleton<Byte> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class ByteSerializerSnapshot extends SimpleTypeSerializerSnapshot<Byte> {
 
 		public ByteSerializerSnapshot() {
-			super(ByteSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
index e7d92bb..6f0cb76 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
@@ -96,10 +96,11 @@ public final class ByteValueSerializer extends TypeSerializerSingleton<ByteValue
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class ByteValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<ByteValue> {
 
 		public ByteValueSerializerSnapshot() {
-			super(ByteValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
index 7805d54..cee0d0d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
@@ -99,10 +99,11 @@ public final class CharSerializer extends TypeSerializerSingleton<Character> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class CharSerializerSnapshot extends SimpleTypeSerializerSnapshot<Character> {
 
 		public CharSerializerSnapshot() {
-			super(CharSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
index 697513a..aa580fe 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
@@ -96,10 +96,11 @@ public class CharValueSerializer extends TypeSerializerSingleton<CharValue> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class CharValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<CharValue> {
 
 		public CharValueSerializerSnapshot() {
-			super(CharValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
index 21c8432..0675e22 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java
@@ -114,10 +114,11 @@ public final class DateSerializer extends TypeSerializerSingleton<Date> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class DateSerializerSnapshot extends SimpleTypeSerializerSnapshot<Date> {
 
 		public DateSerializerSnapshot() {
-			super(DateSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
index 447c548..cd4854b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
@@ -99,10 +99,11 @@ public final class DoubleSerializer extends TypeSerializerSingleton<Double> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class DoubleSerializerSnapshot extends SimpleTypeSerializerSnapshot<Double> {
 
 		public DoubleSerializerSnapshot() {
-			super(DoubleSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
index 27b9fe2..c0c6a27 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
@@ -96,10 +96,11 @@ public final class DoubleValueSerializer extends TypeSerializerSingleton<DoubleV
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class DoubleValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<DoubleValue> {
 
 		public DoubleValueSerializerSnapshot() {
-			super(DoubleValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
index b88fbaf..6da6f47 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
@@ -99,10 +99,11 @@ public final class FloatSerializer extends TypeSerializerSingleton<Float> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class FloatSerializerSnapshot extends SimpleTypeSerializerSnapshot<Float> {
 
 		public FloatSerializerSnapshot() {
-			super(FloatSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
index f5292ce..1bfc87d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
@@ -96,10 +96,11 @@ public class FloatValueSerializer extends TypeSerializerSingleton<FloatValue> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class FloatValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<FloatValue> {
 
 		public FloatValueSerializerSnapshot() {
-			super(FloatValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
index d39aba2..17f1b7a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/InstantSerializer.java
@@ -115,10 +115,11 @@ public final class InstantSerializer extends TypeSerializerSingleton<Instant> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class InstantSerializerSnapshot extends SimpleTypeSerializerSnapshot<Instant> {
 
 		public InstantSerializerSnapshot() {
-			super(InstantSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
index 0c87786..6d0d407 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
@@ -101,8 +101,9 @@ public final class IntSerializer extends TypeSerializerSingleton<Integer> {
 	 */
 	public static final class IntSerializerSnapshot extends SimpleTypeSerializerSnapshot<Integer> {
 
+		@SuppressWarnings("WeakerAccess")
 		public IntSerializerSnapshot() {
-			super(IntSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
index fae1bb6..846df31 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
@@ -96,10 +96,11 @@ public final class IntValueSerializer extends TypeSerializerSingleton<IntValue>
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class IntValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<IntValue> {
 
 		public IntValueSerializerSnapshot() {
-			super(IntValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
index 8100307..24cf7c7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
@@ -99,10 +99,11 @@ public final class LongSerializer extends TypeSerializerSingleton<Long> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class LongSerializerSnapshot extends SimpleTypeSerializerSnapshot<Long> {
 
 		public LongSerializerSnapshot() {
-			super(LongSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
index 2d1fea8..9f4b5d8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
@@ -96,10 +96,11 @@ public final class LongValueSerializer extends TypeSerializerSingleton<LongValue
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class LongValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<LongValue> {
 
 		public LongValueSerializerSnapshot() {
-			super(LongValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
index 233cc56..48c0b57 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java
@@ -92,10 +92,11 @@ public final class NullValueSerializer extends TypeSerializerSingleton<NullValue
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class NullValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<NullValue> {
 
 		public NullValueSerializerSnapshot() {
-			super(NullValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
index d1c78ec..fa6189d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
@@ -99,10 +99,11 @@ public final class ShortSerializer extends TypeSerializerSingleton<Short> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class ShortSerializerSnapshot extends SimpleTypeSerializerSnapshot<Short> {
 
 		public ShortSerializerSnapshot() {
-			super(ShortSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
index b8f815b..e667faa4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
@@ -96,10 +96,11 @@ public final class ShortValueSerializer extends TypeSerializerSingleton<ShortVal
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class ShortValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<ShortValue> {
 
 		public ShortValueSerializerSnapshot() {
-			super(ShortValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
index 6b780a3..ab96dd9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlDateSerializer.java
@@ -115,10 +115,11 @@ public final class SqlDateSerializer extends TypeSerializerSingleton<Date> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class SqlDateSerializerSnapshot extends SimpleTypeSerializerSnapshot<Date> {
 
 		public SqlDateSerializerSnapshot() {
-			super(SqlDateSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
index 9218ca1..b5bf7b5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimeSerializer.java
@@ -119,10 +119,11 @@ public final class SqlTimeSerializer extends TypeSerializerSingleton<Time> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class SqlTimeSerializerSnapshot extends SimpleTypeSerializerSnapshot<Time> {
 
 		public SqlTimeSerializerSnapshot() {
-			super(SqlTimeSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java
index e35c78b..5d7760c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/SqlTimestampSerializer.java
@@ -123,10 +123,11 @@ public final class SqlTimestampSerializer extends TypeSerializerSingleton<Timest
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class SqlTimestampSerializerSnapshot extends SimpleTypeSerializerSnapshot<Timestamp> {
 
 		public SqlTimestampSerializerSnapshot() {
-			super(SqlTimestampSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
index 0761779..dc67a9c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
@@ -100,10 +100,11 @@ public final class StringSerializer extends TypeSerializerSingleton<String> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class StringSerializerSnapshot extends SimpleTypeSerializerSnapshot<String> {
 
 		public StringSerializerSnapshot() {
-			super(StringSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
index 9075cf4..498b0f0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
@@ -121,10 +121,11 @@ public final class StringValueSerializer extends TypeSerializerSingleton<StringV
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class StringValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<StringValue> {
 
 		public StringValueSerializerSnapshot() {
-			super(StringValueSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
index 4f01d76..437aa14 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 
 @Internal
 public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
@@ -53,13 +52,12 @@ public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
 		}
 	}
 
+	/**
+	 * @deprecated this is kept around for backwards compatibility.
+	 *             Can only be removed when {@link ParameterlessTypeSerializerConfig} is removed.
+	 */
 	@Override
-	public TypeSerializerSnapshot<T> snapshotConfiguration() {
-		// type serializer singletons should always be parameter-less
-		return new ParameterlessTypeSerializerConfig<>(getSerializationFormatIdentifier());
-	}
-
-	@Override
+	@Deprecated
 	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
 		if (configSnapshot instanceof ParameterlessTypeSerializerConfig
 				&& isCompatibleSerializationFormatIdentifier(
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
index ec87e2b..d9fae48 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
@@ -100,10 +100,11 @@ public final class VoidSerializer extends TypeSerializerSingleton<Void> {
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class VoidSerializerSnapshot extends SimpleTypeSerializerSnapshot<Void> {
 
 		public VoidSerializerSnapshot() {
-			super(VoidSerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
index ecfd604..a8f76c4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
@@ -120,10 +120,11 @@ public final class BooleanPrimitiveArraySerializer extends TypeSerializerSinglet
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class BooleanPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<boolean[]> {
 
 		public BooleanPrimitiveArraySerializerSnapshot() {
-			super(BooleanPrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
index 414ef17..803bd53 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
@@ -113,10 +113,11 @@ public final class BytePrimitiveArraySerializer extends TypeSerializerSingleton<
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class BytePrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<byte[]> {
 
 		public BytePrimitiveArraySerializerSnapshot() {
-			super(BytePrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
index f08062b..1e2357a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class CharPrimitiveArraySerializer extends TypeSerializerSingleton<
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class CharPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<char[]> {
 
 		public CharPrimitiveArraySerializerSnapshot() {
-			super(CharPrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
index a38ae66..5deddc8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class DoublePrimitiveArraySerializer extends TypeSerializerSingleto
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class DoublePrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<double[]> {
 
 		public DoublePrimitiveArraySerializerSnapshot() {
-			super(DoublePrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
index d447793..f65bcf3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class FloatPrimitiveArraySerializer extends TypeSerializerSingleton
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class FloatPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<float[]> {
 
 		public FloatPrimitiveArraySerializerSnapshot() {
-			super(FloatPrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
index cbfa04d..7923eca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public class IntPrimitiveArraySerializer extends TypeSerializerSingleton<int[]>{
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class IntPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<int[]> {
 
 		public IntPrimitiveArraySerializerSnapshot() {
-			super(IntPrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
index f56978c..66afb32 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class LongPrimitiveArraySerializer extends TypeSerializerSingleton<
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class LongPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<long[]> {
 
 		public LongPrimitiveArraySerializerSnapshot() {
-			super(LongPrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
index b9dd1a9..a93dd71 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
@@ -119,10 +119,11 @@ public final class ShortPrimitiveArraySerializer extends TypeSerializerSingleton
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class ShortPrimitiveArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<short[]> {
 
 		public ShortPrimitiveArraySerializerSnapshot() {
-			super(ShortPrimitiveArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
index 1cdf317..7c81aa6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
@@ -123,9 +123,10 @@ public final class StringArraySerializer extends TypeSerializerSingleton<String[
 	/**
 	 * Serializer configuration snapshot for compatibility and format evolution.
 	 */
+	@SuppressWarnings("WeakerAccess")
 	public static final class StringArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<String[]> {
 		public StringArraySerializerSnapshot() {
-			super(StringArraySerializer.class);
+			super(() -> INSTANCE);
 		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index 68e0eec..53fffce 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
@@ -272,5 +274,23 @@ public class DeweyNumber implements Serializable {
 		public int hashCode() {
 			return elemSerializer.hashCode();
 		}
+
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<DeweyNumber> snapshotConfiguration() {
+			return new DeweyNumberSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class DeweyNumberSerializerSnapshot extends SimpleTypeSerializerSnapshot<DeweyNumber> {
+
+			public DeweyNumberSerializerSnapshot() {
+				super(() -> INSTANCE);
+			}
+		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
index 05b6c91..ccbe25c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
@@ -187,4 +189,21 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 		return true;
 	}
 
+	// -----------------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializerSnapshot<NFAState> snapshotConfiguration() {
+		return new NFAStateSerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class NFAStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<NFAState> {
+
+		public NFAStateSerializerSnapshot() {
+			super(() -> INSTANCE);
+		}
+	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
index c1a6ccb..045cf38 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
@@ -147,5 +149,23 @@ public class EventId implements Comparable<EventId> {
 		public boolean canEqual(Object obj) {
 			return obj.getClass().equals(EventIdSerializer.class);
 		}
+
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<EventId> snapshotConfiguration() {
+			return new EventIdSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class EventIdSerializerSnapshot extends SimpleTypeSerializerSnapshot<EventId> {
+
+			public EventIdSerializerSnapshot() {
+				super(() -> INSTANCE);
+			}
+		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
index 3a13184..87dc2c3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
@@ -151,5 +153,22 @@ public class NodeId {
 			return obj.getClass().equals(NodeIdSerializer.class);
 		}
 
+		// ------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<NodeId> snapshotConfiguration() {
+			return new NodeIdSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class NodeIdSerializerSnapshot extends SimpleTypeSerializerSnapshot<NodeId> {
+
+			public NodeIdSerializerSnapshot() {
+				super(() -> INSTANCE);
+			}
+		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
index c8d9021..2af92f5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.cep.nfa.DeweyNumber;
 import org.apache.flink.core.memory.DataInputView;
@@ -122,5 +124,23 @@ public class SharedBufferEdge {
 		public boolean canEqual(Object obj) {
 			return obj.getClass().equals(SharedBufferEdgeSerializer.class);
 		}
+
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<SharedBufferEdge> snapshotConfiguration() {
+			return new SharedBufferEdgeSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class SharedBufferEdgeSerializerSnapshot extends SimpleTypeSerializerSnapshot<SharedBufferEdge> {
+
+			public SharedBufferEdgeSerializerSnapshot() {
+				super(() -> INSTANCE);
+			}
+		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
index b613625..96afdbb 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge.SharedBufferEdgeSerializer;
@@ -116,5 +118,23 @@ public class SharedBufferNode {
 		public boolean canEqual(Object obj) {
 			return obj.getClass().equals(SharedBufferNodeSerializer.class);
 		}
+
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<SharedBufferNode> snapshotConfiguration() {
+			return new SharedBufferNodeSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class SharedBufferNodeSerializerSnapshot extends SimpleTypeSerializerSnapshot<SharedBufferNode> {
+
+			public SharedBufferNodeSerializerSnapshot() {
+				super(SharedBufferNodeSerializer::new);
+			}
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCode.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCode.java
index 22a681c..6fc7bd0 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCode.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/transform/LongValueWithProperHashCode.java
@@ -24,9 +24,10 @@ import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.LongValueComparator;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -220,10 +221,22 @@ extends LongValue {
 			return obj instanceof LongValueWithProperHashCodeSerializer;
 		}
 
+		// -----------------------------------------------------------------------------------
+
 		@Override
-		protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-			return super.isCompatibleSerializationFormatIdentifier(identifier)
-				|| identifier.equals(LongSerializer.class.getCanonicalName());
+		public TypeSerializerSnapshot<LongValueWithProperHashCode> snapshotConfiguration() {
+			return new LongValueWithProperHashCodeSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class LongValueWithProperHashCodeSerializerSnapshot extends SimpleTypeSerializerSnapshot<LongValueWithProperHashCode> {
+
+			public LongValueWithProperHashCodeSerializerSnapshot() {
+				super(LongValueWithProperHashCodeSerializer::new);
+			}
 		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializer.java
index a49359a..c690543 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ByteValueArraySerializer.java
@@ -19,8 +19,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -85,9 +86,20 @@ public final class ByteValueArraySerializer extends TypeSerializerSingleton<Byte
 		return obj instanceof ByteValueArraySerializer;
 	}
 
+	// ------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<ByteValueArray> snapshotConfiguration() {
+		return new ByteValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class ByteValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<ByteValueArray> {
+		public ByteValueArraySerializerSnapshot() {
+			super(ByteValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializer.java
index 4d97cdb..0844986 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/CharValueArraySerializer.java
@@ -19,8 +19,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -85,9 +86,21 @@ public final class CharValueArraySerializer extends TypeSerializerSingleton<Char
 		return obj instanceof CharValueArraySerializer;
 	}
 
+	// -----------------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<CharValueArray> snapshotConfiguration() {
+		return new CharValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class CharValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<CharValueArray> {
+
+		public CharValueArraySerializerSnapshot() {
+			super(CharValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializer.java
index 5808946..be683b5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/DoubleValueArraySerializer.java
@@ -19,8 +19,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -85,9 +86,21 @@ public final class DoubleValueArraySerializer extends TypeSerializerSingleton<Do
 		return obj instanceof DoubleValueArraySerializer;
 	}
 
+	// -----------------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<DoubleValueArray> snapshotConfiguration() {
+		return new DoubleValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class DoubleValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<DoubleValueArray> {
+
+		public DoubleValueArraySerializerSnapshot() {
+			super(DoubleValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializer.java
index fc78fd2..0fb0f61 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/FloatValueArraySerializer.java
@@ -19,8 +19,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -85,9 +86,21 @@ public final class FloatValueArraySerializer extends TypeSerializerSingleton<Flo
 		return obj instanceof FloatValueArraySerializer;
 	}
 
+	// -----------------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<FloatValueArray> snapshotConfiguration() {
+		return new FloatValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class FloatValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<FloatValueArray> {
+
+		public FloatValueArraySerializerSnapshot() {
+			super(FloatValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
index 5984122..c00c136 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/IntValueArraySerializer.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.IntPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -84,9 +85,21 @@ public final class IntValueArraySerializer extends TypeSerializerSingleton<IntVa
 		return obj instanceof IntValueArraySerializer;
 	}
 
+	// -----------------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(IntPrimitiveArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<IntValueArray> snapshotConfiguration() {
+		return new IntValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class IntValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<IntValueArray> {
+
+		public IntValueArraySerializerSnapshot() {
+			super(IntValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
index ade46ca..55a79b3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/LongValueArraySerializer.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -84,9 +85,21 @@ public final class LongValueArraySerializer extends TypeSerializerSingleton<Long
 		return obj instanceof LongValueArraySerializer;
 	}
 
+	// -----------------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<LongValueArray> snapshotConfiguration() {
+		return new LongValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class LongValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<LongValueArray> {
+
+		public LongValueArraySerializerSnapshot() {
+			super(LongValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
index 233ed20..732bb0f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/NullValueArraySerializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -82,4 +84,22 @@ public final class NullValueArraySerializer extends TypeSerializerSingleton<Null
 	public boolean canEqual(Object obj) {
 		return obj instanceof NullValueArraySerializer;
 	}
+
+	// -----------------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializerSnapshot<NullValueArray> snapshotConfiguration() {
+		return new NullValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class NullValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<NullValueArray> {
+
+		public NullValueArraySerializerSnapshot() {
+			super(NullValueArraySerializer::new);
+		}
+	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
index bce4d81..2000799 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/ShortValueArraySerializer.java
@@ -19,8 +19,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -85,9 +86,21 @@ public final class ShortValueArraySerializer extends TypeSerializerSingleton<Sho
 		return obj instanceof ShortValueArraySerializer;
 	}
 
+	// ------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(LongPrimitiveArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<ShortValueArray> snapshotConfiguration() {
+		return new ShortValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class ShortValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<ShortValueArray> {
+
+		public ShortValueArraySerializerSnapshot() {
+			super(ShortValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
index 6dbe0e5..d3dbe70 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/types/valuearray/StringValueArraySerializer.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.graph.types.valuearray;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -84,9 +85,21 @@ public final class StringValueArraySerializer extends TypeSerializerSingleton<St
 		return obj instanceof StringValueArraySerializer;
 	}
 
+	// -----------------------------------------------------------------------------------
+
 	@Override
-	protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
-		return super.isCompatibleSerializationFormatIdentifier(identifier)
-			|| identifier.equals(StringArraySerializer.class.getCanonicalName());
+	public TypeSerializerSnapshot<StringValueArray> snapshotConfiguration() {
+		return new StringValueArraySerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class StringValueArraySerializerSnapshot extends SimpleTypeSerializerSnapshot<StringValueArray> {
+
+		public StringValueArraySerializerSnapshot() {
+			super(StringValueArraySerializer::new);
+		}
 	}
 }
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java
index 38db705..e6a9f10 100644
--- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java
@@ -19,6 +19,8 @@
 package org.apache.flink.queryablestate.client;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -93,4 +95,22 @@ public final class VoidNamespaceSerializer extends TypeSerializerSingleton<VoidN
 	public boolean canEqual(Object obj) {
 		return obj instanceof VoidNamespaceSerializer;
 	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializerSnapshot<VoidNamespace> snapshotConfiguration() {
+		return new VoidNamespaceSerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public class VoidNamespaceSerializerSnapshot extends SimpleTypeSerializerSnapshot<VoidNamespace> {
+
+		public VoidNamespaceSerializerSnapshot() {
+			super(() -> INSTANCE);
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
index 7d9e888..60b5fd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
@@ -97,4 +99,22 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializerSinglet
 	public boolean canEqual(Object obj) {
 		return obj instanceof JavaSerializer;
 	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializerSnapshot<T> snapshotConfiguration() {
+		return new JavaSerializerSnapshot<>();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class JavaSerializerSnapshot<T extends Serializable> extends SimpleTypeSerializerSnapshot<T> {
+
+		public JavaSerializerSnapshot() {
+			super(JavaSerializer::new);
+		}
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
index 8b58891..c861acf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -89,4 +91,22 @@ public final class VoidNamespaceSerializer extends TypeSerializerSingleton<VoidN
 	public boolean canEqual(Object obj) {
 		return obj instanceof VoidNamespaceSerializer;
 	}
+
+	// -----------------------------------------------------------------------------------
+
+	@Override
+	public TypeSerializerSnapshot<VoidNamespace> snapshotConfiguration() {
+		return new VoidNamespaceSerializerSnapshot();
+	}
+
+	/**
+	 * Serializer configuration snapshot for compatibility and format evolution.
+	 */
+	@SuppressWarnings("WeakerAccess")
+	public static final class VoidNamespaceSerializerSnapshot extends SimpleTypeSerializerSnapshot<VoidNamespace> {
+
+		public VoidNamespaceSerializerSnapshot() {
+			super(() -> INSTANCE);
+		}
+	}
 }
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
index 32c44d2..8a49b18 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
@@ -17,8 +17,12 @@
  */
 package org.apache.flink.api.scala.typeutils
 
+import java.util.function.Supplier
+
 import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializer, TypeSerializerSnapshot}
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
+import org.apache.flink.api.scala.typeutils.UnitSerializer.UnitSerializerSnapshot
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
 @Internal
@@ -58,4 +62,27 @@ class UnitSerializer extends TypeSerializerSingleton[Unit] {
   override def canEqual(obj: scala.Any): Boolean = {
     obj.isInstanceOf[UnitSerializer]
   }
+
+  // -----------------------------------------------------------------------------------
+
+  @Override
+  def snapshotConfiguration(): TypeSerializerSnapshot[Unit] = {
+    new UnitSerializerSnapshot()
+  }
+
+  /**
+    * Serializer configuration snapshot for compatibility and format evolution.
+    */
+}
+
+object UnitSerializer {
+
+  /**
+    * Serializer configuration snapshot for compatibility and format evolution.
+    */
+  final class UnitSerializerSnapshot
+        extends SimpleTypeSerializerSnapshot[Unit](
+    new Supplier[TypeSerializer[Unit]] {
+      override def get() = new UnitSerializer()
+    })
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
index 835197f..0e09b2e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
@@ -19,7 +19,9 @@
 package org.apache.flink.streaming.api.windowing.windows;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -120,5 +122,23 @@ public class GlobalWindow extends Window {
 		public boolean canEqual(Object obj) {
 			return obj instanceof Serializer;
 		}
+
+		// ------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<GlobalWindow> snapshotConfiguration() {
+			return new GlobalWindowSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class GlobalWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<GlobalWindow> {
+
+			public GlobalWindowSerializerSnapshot() {
+				super(GlobalWindow.Serializer::new);
+			}
+		}
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 0e89294..a778ebf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -19,6 +19,8 @@
 package org.apache.flink.streaming.api.windowing.windows;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
@@ -188,6 +190,24 @@ public class TimeWindow extends Window {
 		public boolean canEqual(Object obj) {
 			return obj instanceof Serializer;
 		}
+
+		// ------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<TimeWindow> snapshotConfiguration() {
+			return new TimeWindowSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class TimeWindowSerializerSnapshot extends SimpleTypeSerializerSnapshot<TimeWindow> {
+
+			public TimeWindowSerializerSnapshot() {
+				super(Serializer::new);
+			}
+		}
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
index aa8e59e..67ada5d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointingCustomKvStateProgram.java
@@ -24,7 +24,9 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -230,5 +232,23 @@ public class CheckpointingCustomKvStateProgram {
 		public boolean canEqual(Object obj) {
 			return obj instanceof CustomIntSerializer;
 		}
+
+		// -----------------------------------------------------------------------------------
+
+		@Override
+		public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
+			return new CustomIntSerializerSnapshot();
+		}
+
+		/**
+		 * Serializer configuration snapshot for compatibility and format evolution.
+		 */
+		@SuppressWarnings("WeakerAccess")
+		public static final class CustomIntSerializerSnapshot extends SimpleTypeSerializerSnapshot<Integer> {
+
+			public CustomIntSerializerSnapshot() {
+				super(() -> INSTANCE);
+			}
+		}
 	}
 }