You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/26 03:14:37 UTC

[GitHub] tzulitai closed pull request #7029: [BP-1.7] [FLINK-10789] New serializer snapshots after 1.6 should implement TypeSerializerSnapshot

tzulitai closed pull request #7029: [BP-1.7] [FLINK-10789] New serializer snapshots after 1.6 should implement TypeSerializerSnapshot
URL: https://github.com/apache/flink/pull/7029
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
index 2caf9fa18c6..5f89d94973f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.List;
@@ -48,7 +49,7 @@ public ListSerializerSnapshot() {}
 	 * Constructor to create the snapshot for writing.
 	 */
 	public ListSerializerSnapshot(TypeSerializer<T> elementSerializer) {
-		this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(elementSerializer);
+		this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(elementSerializer));
 	}
 
 	@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
index 92d17996ea5..be2e4b0cbe6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.Map;
@@ -48,6 +49,8 @@ public MapSerializerSnapshot() {}
 	 * Constructor to create the snapshot for writing.
 	 */
 	public MapSerializerSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
+		Preconditions.checkNotNull(keySerializer);
+		Preconditions.checkNotNull(valueSerializer);
 		this.nestedKeyValueSerializerSnapshot = new CompositeSerializerSnapshot(keySerializer, valueSerializer);
 	}
 
diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml
index 81db582b640..7c32735d92c 100644
--- a/flink-libraries/flink-cep/pom.xml
+++ b/flink-libraries/flink-cep/pom.xml
@@ -59,6 +59,14 @@ under the License.
 
         <!-- test dependencies -->
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
index a2eedaf138c..53475917d93 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
@@ -25,7 +25,6 @@
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
@@ -184,40 +183,29 @@ public boolean canEqual(Object obj) {
 		}
 
 		@Override
-		public TypeSerializerConfigSnapshot<Lockable<E>> snapshotConfiguration() {
-			return new LockableSerializerConfigSnapshot<>(elementSerializer);
+		public TypeSerializerSnapshot<Lockable<E>> snapshotConfiguration() {
+			return new LockableTypeSerializerSnapshot<>(elementSerializer);
 		}
 
+		/**
+		 * This cannot be removed until {@link TypeSerializerConfigSnapshot} is no longer supported.
+		 */
 		@Override
 		public CompatibilityResult<Lockable<E>> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-			if (configSnapshot instanceof LockableSerializerConfigSnapshot) {
-				@SuppressWarnings("unchecked")
-				LockableSerializerConfigSnapshot<E> snapshot = (LockableSerializerConfigSnapshot<E>) configSnapshot;
-
-				Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> nestedSerializerAndConfig =
-					snapshot.getSingleNestedSerializerAndConfig();
-
-				CompatibilityResult<E> inputCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
-					nestedSerializerAndConfig.f0,
-					UnloadableDummyTypeSerializer.class,
-					nestedSerializerAndConfig.f1,
-					elementSerializer);
-
-				return (inputCompatibilityResult.isRequiresMigration())
-					? CompatibilityResult.requiresMigration()
-					: CompatibilityResult.compatible();
-			} else {
-				// backwards compatibility path
-				CompatibilityResult<E> inputCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
-					configSnapshot.restoreSerializer(),
-					UnloadableDummyTypeSerializer.class,
-					configSnapshot,
-					elementSerializer);
-
-				return (inputCompatibilityResult.isRequiresMigration())
-					? CompatibilityResult.requiresMigration()
-					: CompatibilityResult.compatible();
-			}
+			// backwards compatibility path
+			CompatibilityResult<E> inputCompatibilityResult = CompatibilityUtil.resolveCompatibilityResult(
+				configSnapshot.restoreSerializer(),
+				UnloadableDummyTypeSerializer.class,
+				configSnapshot,
+				elementSerializer);
+
+			return (inputCompatibilityResult.isRequiresMigration())
+				? CompatibilityResult.requiresMigration()
+				: CompatibilityResult.compatible();
+		}
+
+		TypeSerializer<E> getElementSerializer() {
+			return elementSerializer;
 		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableSerializerConfigSnapshot.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableSerializerConfigSnapshot.java
deleted file mode 100644
index 9e78bc013ef..00000000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableSerializerConfigSnapshot.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.nfa.sharedbuffer;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-
-/**
- * A {@link TypeSerializerConfigSnapshot} for the {@link Lockable.LockableTypeSerializer}.
- */
-@Internal
-public class LockableSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot<Lockable<E>> {
-
-	private static final int VERSION = 1;
-
-	/** This empty nullary constructor is required for deserializing the configuration. */
-	public LockableSerializerConfigSnapshot() {}
-
-	public LockableSerializerConfigSnapshot(TypeSerializer<E> elementSerializer) {
-		super(elementSerializer);
-	}
-
-	@Override
-	public int getVersion() {
-		return VERSION;
-	}
-
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
new file mode 100644
index 00000000000..44a4670cc1d
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link TypeSerializerSnapshot} for the {@link Lockable.LockableTypeSerializer}.
+ */
+@Internal
+public class LockableTypeSerializerSnapshot<E> implements TypeSerializerSnapshot<Lockable<E>> {
+
+	private static final int CURRENT_VERSION = 1;
+
+	private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
+
+	/**
+	 * Constructor for read instantiation.
+	 */
+	public LockableTypeSerializerSnapshot() {}
+
+	/**
+	 * Constructor to create the snapshot for writing.
+	 */
+	public LockableTypeSerializerSnapshot(TypeSerializer<E> elementSerializer) {
+		this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(elementSerializer));
+	}
+
+	@Override
+	public int getCurrentVersion() {
+		return CURRENT_VERSION;
+	}
+
+	@Override
+	public TypeSerializer<Lockable<E>> restoreSerializer() {
+		return new Lockable.LockableTypeSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
+	}
+
+	@Override
+	public TypeSerializerSchemaCompatibility<Lockable<E>> resolveSchemaCompatibility(TypeSerializer<Lockable<E>> newSerializer) {
+		checkState(nestedElementSerializerSnapshot != null);
+
+		if (newSerializer instanceof Lockable.LockableTypeSerializer) {
+			Lockable.LockableTypeSerializer<E> serializer = (Lockable.LockableTypeSerializer<E>) newSerializer;
+
+			return nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
+				TypeSerializerSchemaCompatibility.compatibleAsIs(),
+				serializer.getElementSerializer());
+		}
+		else {
+			return TypeSerializerSchemaCompatibility.incompatible();
+		}
+	}
+
+	@Override
+	public void writeSnapshot(DataOutputView out) throws IOException {
+		nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
+	}
+
+	@Override
+	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		this.nestedElementSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	}
+
+}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
new file mode 100644
index 00000000000..bb3b7f27e10
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshotMigrationTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+/**
+ * Migration test for the {@link LockableTypeSerializerSnapshot}.
+ */
+public class LockableTypeSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Lockable<String>> {
+
+	private static final String DATA = "flink-1.6-lockable-type-serializer-data";
+	private static final String SNAPSHOT = "flink-1.6-lockable-type-serializer-snapshot";
+
+	public LockableTypeSerializerSnapshotMigrationTest() {
+		super(
+			TestSpecification.<Lockable<String>>builder(
+					"1.6-lockable-type-serializer",
+					Lockable.LockableTypeSerializer.class,
+					LockableTypeSerializerSnapshot.class)
+				.withSerializerProvider(() -> new Lockable.LockableTypeSerializer<>(StringSerializer.INSTANCE))
+				.withSnapshotDataLocation(SNAPSHOT)
+				.withTestData(DATA, 10)
+		);
+	}
+}
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.6-lockable-type-serializer-data b/flink-libraries/flink-cep/src/test/resources/flink-1.6-lockable-type-serializer-data
new file mode 100644
index 00000000000..b17504031a6
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.6-lockable-type-serializer-data differ
diff --git a/flink-libraries/flink-cep/src/test/resources/flink-1.6-lockable-type-serializer-snapshot b/flink-libraries/flink-cep/src/test/resources/flink-1.6-lockable-type-serializer-snapshot
new file mode 100644
index 00000000000..3d9fa0c07fb
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/flink-1.6-lockable-type-serializer-snapshot differ
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
new file mode 100644
index 00000000000..cca84d28d17
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview;
+
+import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link TypeSerializerSnapshot} for the {@link ListViewSerializer}.
+ *
+ * @param <T> the type of the list elements.
+ */
+public final class ListViewSerializerSnapshot<T> implements TypeSerializerSnapshot<ListView<T>> {
+
+	private static final int CURRENT_VERSION = 1;
+
+	private CompositeSerializerSnapshot nestedListSerializerSnapshot;
+
+	/**
+	 * Constructor for read instantiation.
+	 */
+	public ListViewSerializerSnapshot() {}
+
+	/**
+	 * Constructor to create the snapshot for writing.
+	 */
+	public ListViewSerializerSnapshot(TypeSerializer<List<T>> listSerializer) {
+		this.nestedListSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(listSerializer));
+	}
+
+	@Override
+	public int getCurrentVersion() {
+		return CURRENT_VERSION;
+	}
+
+	@Override
+	public TypeSerializer<ListView<T>> restoreSerializer() {
+		return new ListViewSerializer<>(nestedListSerializerSnapshot.getRestoreSerializer(0));
+	}
+
+	@Override
+	public TypeSerializerSchemaCompatibility<ListView<T>> resolveSchemaCompatibility(TypeSerializer<ListView<T>> newSerializer) {
+		checkState(nestedListSerializerSnapshot != null);
+
+		if (newSerializer instanceof ListViewSerializer) {
+			ListViewSerializer<T> serializer = (ListViewSerializer<T>) newSerializer;
+
+			return nestedListSerializerSnapshot.resolveCompatibilityWithNested(
+				TypeSerializerSchemaCompatibility.compatibleAsIs(),
+				serializer.getListSerializer());
+		}
+		else {
+			return TypeSerializerSchemaCompatibility.incompatible();
+		}
+	}
+
+	@Override
+	public void writeSnapshot(DataOutputView out) throws IOException {
+		nestedListSerializerSnapshot.writeCompositeSnapshot(out);
+	}
+
+	@Override
+	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		this.nestedListSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	}
+}
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerConfigSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerConfigSnapshot.java
deleted file mode 100644
index 10b4419a8c7..00000000000
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerConfigSnapshot.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.dataview;
-
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.base.MapSerializer;
-import org.apache.flink.table.api.dataview.MapView;
-
-/**
- * A {@link TypeSerializerConfigSnapshot} for the {@link MapViewSerializer}.
- *
- * @param <K> the key type of the map entries.
- * @param <V> the value type of the map entries.
- */
-public class MapViewSerializerConfigSnapshot<K, V> extends CompositeTypeSerializerConfigSnapshot<MapView<K, V>> {
-
-	private static final int VERSION = 1;
-
-	/** This empty nullary constructor is required for deserializing the configuration. */
-	public MapViewSerializerConfigSnapshot() {}
-
-	public MapViewSerializerConfigSnapshot(MapSerializer<K, V> mapSerializer) {
-		super(mapSerializer);
-	}
-
-	@Override
-	public int getVersion() {
-		return VERSION;
-	}
-}
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
new file mode 100644
index 00000000000..f59fc0a3654
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview;
+
+import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.api.dataview.MapView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link TypeSerializerSnapshot} for the {@link MapViewSerializer}.
+ *
+ * @param <K> the key type of the map entries.
+ * @param <V> the value type of the map entries.
+ */
+public class MapViewSerializerSnapshot<K, V> implements TypeSerializerSnapshot<MapView<K, V>> {
+
+	private static final int CURRENT_VERSION = 1;
+
+	private CompositeSerializerSnapshot nestedMapSerializerSnapshot;
+
+	/**
+	 * Constructor for read instantiation.
+	 */
+	public MapViewSerializerSnapshot() {}
+
+	/**
+	 * Constructor to create the snapshot for writing.
+	 */
+	public MapViewSerializerSnapshot(TypeSerializer<Map<K, V>> mapSerializer) {
+		this.nestedMapSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(mapSerializer));
+	}
+
+	@Override
+	public int getCurrentVersion() {
+		return CURRENT_VERSION;
+	}
+
+	@Override
+	public TypeSerializer<MapView<K, V>> restoreSerializer() {
+		return new MapViewSerializer<>(nestedMapSerializerSnapshot.getRestoreSerializer(0));
+	}
+
+	@Override
+	public TypeSerializerSchemaCompatibility<MapView<K, V>> resolveSchemaCompatibility(
+			TypeSerializer<MapView<K, V>> newSerializer) {
+		checkState(nestedMapSerializerSnapshot != null);
+
+		if (newSerializer instanceof MapViewSerializer) {
+			MapViewSerializer<K, V> serializer = (MapViewSerializer<K, V>) newSerializer;
+
+			return nestedMapSerializerSnapshot.resolveCompatibilityWithNested(
+				TypeSerializerSchemaCompatibility.compatibleAsIs(),
+				serializer.getMapSerializer());
+		}
+		else {
+			return TypeSerializerSchemaCompatibility.incompatible();
+		}
+	}
+
+	@Override
+	public void writeSnapshot(DataOutputView out) throws IOException {
+		nestedMapSerializerSnapshot.writeCompositeSnapshot(out);
+	}
+
+	@Override
+	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		this.nestedMapSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	}
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
index 9a10c118c0b..246af6c0dab 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
@@ -33,13 +33,14 @@ import org.apache.flink.table.api.dataview.ListView
   * @param listSerializer List serializer.
   * @tparam T The type of element in the list.
   */
-class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+@SerialVersionUID(-2030398712359267867L)
+class ListViewSerializer[T](val listSerializer: TypeSerializer[java.util.List[T]])
   extends TypeSerializer[ListView[T]] {
 
   override def isImmutableType: Boolean = false
 
   override def duplicate(): TypeSerializer[ListView[T]] = {
-    new ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+    new ListViewSerializer[T](listSerializer.duplicate())
   }
 
   override def createInstance(): ListView[T] = {
@@ -75,41 +76,28 @@ class ListViewSerializer[T](val listSerializer: ListSerializer[T])
   override def equals(obj: Any): Boolean = canEqual(this) &&
     listSerializer.equals(obj.asInstanceOf[ListViewSerializer[_]].listSerializer)
 
-  override def snapshotConfiguration(): ListViewSerializerConfigSnapshot[T] =
-    new ListViewSerializerConfigSnapshot[T](listSerializer)
+  override def snapshotConfiguration(): ListViewSerializerSnapshot[T] =
+    new ListViewSerializerSnapshot[T](listSerializer)
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[ListView[T]] = {
 
     configSnapshot match {
-      case snapshot: ListViewSerializerConfigSnapshot[T] =>
-        val previousListSerializerAndConfig =
-          snapshot.getSingleNestedSerializerAndConfig
-
-        val compatResult = CompatibilityUtil.resolveCompatibilityResult(
-          previousListSerializerAndConfig.f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          previousListSerializerAndConfig.f1,
-          listSerializer)
-
-        if (!compatResult.isRequiresMigration) {
-          CompatibilityResult.compatible[ListView[T]]
-        } else {
-          CompatibilityResult.requiresMigration[ListView[T]]
-        }
-
       // backwards compatibility path;
-      // Flink versions older or equal to 1.5.x returns a
+      // Flink versions older or equal to 1.6.x returns a
       // CollectionSerializerConfigSnapshot as the snapshot
       case legacySnapshot: CollectionSerializerConfigSnapshot[java.util.List[T], T] =>
         val previousListSerializerAndConfig =
           legacySnapshot.getSingleNestedSerializerAndConfig
 
+        // in older versions, the nested list serializer was always
+        // specifically a ListSerializer, so this cast is safe
+        val castedSer = listSerializer.asInstanceOf[ListSerializer[T]]
         val compatResult = CompatibilityUtil.resolveCompatibilityResult(
           previousListSerializerAndConfig.f0,
           classOf[UnloadableDummyTypeSerializer[_]],
           previousListSerializerAndConfig.f1,
-          listSerializer.getElementSerializer)
+          castedSer.getElementSerializer)
 
         if (!compatResult.isRequiresMigration) {
           CompatibilityResult.compatible[ListView[T]]
@@ -120,4 +108,6 @@ class ListViewSerializer[T](val listSerializer: ListSerializer[T])
       case _ => CompatibilityResult.requiresMigration[ListView[T]]
     }
   }
+
+  def getListSerializer: TypeSerializer[java.util.List[T]] = listSerializer
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
index d6419e81ab3..89cdf701749 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
@@ -35,14 +35,14 @@ import org.apache.flink.table.api.dataview.MapView
   * @tparam K The type of the keys in the map.
   * @tparam V The type of the values in the map.
   */
-class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V])
+@SerialVersionUID(-9007142882049098705L)
+class MapViewSerializer[K, V](val mapSerializer: TypeSerializer[java.util.Map[K, V]])
   extends TypeSerializer[MapView[K, V]] {
 
   override def isImmutableType: Boolean = false
 
   override def duplicate(): TypeSerializer[MapView[K, V]] =
-    new MapViewSerializer[K, V](
-      mapSerializer.duplicate().asInstanceOf[MapSerializer[K, V]])
+    new MapViewSerializer[K, V](mapSerializer.duplicate())
 
   override def createInstance(): MapView[K, V] = {
     new MapView[K, V]()
@@ -77,30 +77,14 @@ class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V])
   override def equals(obj: Any): Boolean = canEqual(this) &&
     mapSerializer.equals(obj.asInstanceOf[MapViewSerializer[_, _]].mapSerializer)
 
-  override def snapshotConfiguration(): MapViewSerializerConfigSnapshot[K, V] =
-    new MapViewSerializerConfigSnapshot[K, V](mapSerializer)
+  override def snapshotConfiguration(): MapViewSerializerSnapshot[K, V] =
+    new MapViewSerializerSnapshot[K, V](mapSerializer)
 
   // copy and modified from MapSerializer.ensureCompatibility
   override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot[_])
   : CompatibilityResult[MapView[K, V]] = {
 
     configSnapshot match {
-      case snapshot: MapViewSerializerConfigSnapshot[K, V] =>
-        val previousKvSerializersAndConfigs =
-          snapshot.getNestedSerializersAndConfigs
-
-        val mapSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-          previousKvSerializersAndConfigs.get(0).f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          previousKvSerializersAndConfigs.get(0).f1,
-          mapSerializer)
-
-        if (!mapSerializerCompatResult.isRequiresMigration) {
-          CompatibilityResult.compatible[MapView[K, V]]
-        } else {
-          CompatibilityResult.requiresMigration[MapView[K, V]]
-        }
-
       // backwards compatibility path;
       // Flink versions older or equal to 1.5.x returns a
       // MapSerializerConfigSnapshot as the snapshot
@@ -108,17 +92,20 @@ class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V])
         val previousKvSerializersAndConfigs =
           legacySnapshot.getNestedSerializersAndConfigs
 
+        // in older versions, the nested map serializer was always
+        // specifically a MapSerializer, so this cast is safe
+        val castedSer = mapSerializer.asInstanceOf[MapSerializer[K, V]]
         val keyCompatResult = CompatibilityUtil.resolveCompatibilityResult(
           previousKvSerializersAndConfigs.get(0).f0,
           classOf[UnloadableDummyTypeSerializer[_]],
           previousKvSerializersAndConfigs.get(0).f1,
-          mapSerializer)
+          castedSer.getKeySerializer)
 
         val valueCompatResult = CompatibilityUtil.resolveCompatibilityResult(
           previousKvSerializersAndConfigs.get(1).f0,
           classOf[UnloadableDummyTypeSerializer[_]],
           previousKvSerializersAndConfigs.get(1).f1,
-          mapSerializer.getValueSerializer)
+          castedSer.getValueSerializer)
 
         if (!keyCompatResult.isRequiresMigration && !valueCompatResult.isRequiresMigration) {
           CompatibilityResult.compatible[MapView[K, V]]
@@ -129,4 +116,6 @@ class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V])
       case _ => CompatibilityResult.requiresMigration[MapView[K, V]]
     }
   }
+
+  def getMapSerializer: TypeSerializer[java.util.Map[K, V]] = mapSerializer
 }
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerConfigSnapshot.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
similarity index 52%
rename from flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerConfigSnapshot.java
rename to flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
index b2126028de2..5465ada23ff 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerConfigSnapshot.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/ListViewSerializerSnapshotMigrationTest.java
@@ -18,29 +18,28 @@
 
 package org.apache.flink.table.dataview;
 
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.table.api.dataview.ListView;
 
 /**
- * A {@link TypeSerializerConfigSnapshot} for the {@link ListViewSerializer}.
- *
- * @param <T> the type of the list elements.
+ * Migration test for the {@link ListViewSerializerSnapshot}.
  */
-public final class ListViewSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<ListView<T>> {
-
-	private static final int VERSION = 1;
+public class ListViewSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<ListView<String>> {
 
-	/** This empty nullary constructor is required for deserializing the configuration. */
-	public ListViewSerializerConfigSnapshot() {}
-
-	public ListViewSerializerConfigSnapshot(ListSerializer<T> listSerializer) {
-		super(listSerializer);
-	}
+	private static final String DATA = "flink-1.6-list-view-serializer-data";
+	private static final String SNAPSHOT = "flink-1.6-list-view-serializer-snapshot";
 
-	@Override
-	public int getVersion() {
-		return VERSION;
+	public ListViewSerializerSnapshotMigrationTest() {
+		super(
+			TestSpecification.<ListView<String>>builder(
+					"1.6-list-view-serializer",
+					ListViewSerializer.class,
+					ListViewSerializerSnapshot.class)
+				.withSerializerProvider(() -> new ListViewSerializer<>(new ListSerializer<>(StringSerializer.INSTANCE)))
+				.withSnapshotDataLocation(SNAPSHOT)
+				.withTestData(DATA, 10)
+		);
 	}
 }
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
new file mode 100644
index 00000000000..66c7f173474
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/dataview/MapViewSerializerSnapshotMigrationTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.table.api.dataview.MapView;
+
+/**
+ * Migration test for the {@link MapViewSerializerSnapshot}.
+ */
+public class MapViewSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<MapView<Integer, String>> {
+
+	private static final String DATA = "flink-1.6-map-view-serializer-data";
+	private static final String SNAPSHOT = "flink-1.6-map-view-serializer-snapshot";
+
+	public MapViewSerializerSnapshotMigrationTest() {
+		super(
+			TestSpecification.<MapView<Integer, String>>builder(
+					"1.6-map-view-serializer",
+					MapViewSerializer.class,
+					MapViewSerializerSnapshot.class)
+				.withSerializerProvider(() -> new MapViewSerializer<>(
+					new MapSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE)))
+				.withSnapshotDataLocation(SNAPSHOT)
+				.withTestData(DATA, 10)
+		);
+	}
+}
diff --git a/flink-libraries/flink-table/src/test/resources/flink-1.6-list-view-serializer-data b/flink-libraries/flink-table/src/test/resources/flink-1.6-list-view-serializer-data
new file mode 100644
index 00000000000..7b6c68ac7f3
Binary files /dev/null and b/flink-libraries/flink-table/src/test/resources/flink-1.6-list-view-serializer-data differ
diff --git a/flink-libraries/flink-table/src/test/resources/flink-1.6-list-view-serializer-snapshot b/flink-libraries/flink-table/src/test/resources/flink-1.6-list-view-serializer-snapshot
new file mode 100644
index 00000000000..2e2fd8ab9f1
Binary files /dev/null and b/flink-libraries/flink-table/src/test/resources/flink-1.6-list-view-serializer-snapshot differ
diff --git a/flink-libraries/flink-table/src/test/resources/flink-1.6-map-view-serializer-data b/flink-libraries/flink-table/src/test/resources/flink-1.6-map-view-serializer-data
new file mode 100644
index 00000000000..6d68f192c03
Binary files /dev/null and b/flink-libraries/flink-table/src/test/resources/flink-1.6-map-view-serializer-data differ
diff --git a/flink-libraries/flink-table/src/test/resources/flink-1.6-map-view-serializer-snapshot b/flink-libraries/flink-table/src/test/resources/flink-1.6-map-view-serializer-snapshot
new file mode 100644
index 00000000000..9546a160ec0
Binary files /dev/null and b/flink-libraries/flink-table/src/test/resources/flink-1.6-map-view-serializer-snapshot differ
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index e7fc6e13c2b..bf1ea5f03b3 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -244,6 +244,7 @@ under the License.
 						</goals>
 						<configuration>
 							<sources>
+								<source>src/test/java</source>
 								<source>src/test/scala</source>
 							</sources>
 						</configuration>
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java
deleted file mode 100644
index 8ad4e09a1e2..00000000000
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerConfigSnapshot.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.typeutils;
-
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import scala.util.Either;
-
-/**
- * Configuration snapshot for serializers of Scala's {@link Either} type,
- * containing configuration snapshots of the Left and Right serializers.
- */
-public class ScalaEitherSerializerConfigSnapshot<L, R>
-		extends CompositeTypeSerializerConfigSnapshot<Either<L, R>> {
-
-	private static final int VERSION = 1;
-
-	/** This empty nullary constructor is required for deserializing the configuration. */
-	public ScalaEitherSerializerConfigSnapshot() {}
-
-	public ScalaEitherSerializerConfigSnapshot(TypeSerializer<L> leftSerializer, TypeSerializer<R> rightSerializer) {
-		super(leftSerializer, rightSerializer);
-	}
-
-	@Override
-	public int getVersion() {
-		return VERSION;
-	}
-}
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
new file mode 100644
index 00000000000..b3ff22cea0c
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+import scala.util.Either;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Configuration snapshot for serializers of Scala's {@link Either} type,
+ * containing configuration snapshots of the Left and Right serializers.
+ */
+public class ScalaEitherSerializerSnapshot<L, R> implements TypeSerializerSnapshot<Either<L, R>> {
+
+
+	private static final int CURRENT_VERSION = 1;
+
+	private CompositeSerializerSnapshot nestedLeftRightSerializerSnapshot;
+
+	/**
+	 * Constructor for read instantiation.
+	 */
+	public ScalaEitherSerializerSnapshot() {}
+
+	/**
+	 * Constructor to create the snapshot for writing.
+	 */
+	public ScalaEitherSerializerSnapshot(TypeSerializer<L> leftSerializer, TypeSerializer<R> rightSerializer) {
+		Preconditions.checkNotNull(leftSerializer);
+		Preconditions.checkNotNull(rightSerializer);
+		this.nestedLeftRightSerializerSnapshot = new CompositeSerializerSnapshot(leftSerializer, rightSerializer);
+	}
+
+	@Override
+	public int getCurrentVersion() {
+		return CURRENT_VERSION;
+	}
+
+	@Override
+	public TypeSerializer<Either<L, R>> restoreSerializer() {
+		return new EitherSerializer<>(
+			nestedLeftRightSerializerSnapshot.getRestoreSerializer(0),
+			nestedLeftRightSerializerSnapshot.getRestoreSerializer(1));
+	}
+
+	@Override
+	public TypeSerializerSchemaCompatibility<Either<L, R>> resolveSchemaCompatibility(
+			TypeSerializer<Either<L, R>> newSerializer) {
+		checkState(nestedLeftRightSerializerSnapshot != null);
+
+		if (newSerializer instanceof EitherSerializer) {
+			EitherSerializer<L, R> serializer = (EitherSerializer<L, R>) newSerializer;
+
+			return nestedLeftRightSerializerSnapshot.resolveCompatibilityWithNested(
+				TypeSerializerSchemaCompatibility.compatibleAsIs(),
+				serializer.getLeftSerializer(),
+				serializer.getRightSerializer());
+		}
+		else {
+			return TypeSerializerSchemaCompatibility.incompatible();
+		}
+	}
+
+	@Override
+	public void writeSnapshot(DataOutputView out) throws IOException {
+		nestedLeftRightSerializerSnapshot.writeCompositeSnapshot(out);
+	}
+
+	@Override
+	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		this.nestedLeftRightSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	}
+}
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 14f2196b9cc..68432a6f1a5 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -116,21 +116,22 @@ class EitherSerializer[A, B](
     31 * leftSerializer.hashCode() + rightSerializer.hashCode()
   }
 
+  def getLeftSerializer: TypeSerializer[A] = leftSerializer
+
+  def getRightSerializer: TypeSerializer[B] = rightSerializer
+
   // --------------------------------------------------------------------------------------------
   // Serializer configuration snapshotting & compatibility
   // --------------------------------------------------------------------------------------------
 
-  override def snapshotConfiguration(): ScalaEitherSerializerConfigSnapshot[A, B] = {
-    new ScalaEitherSerializerConfigSnapshot[A, B](leftSerializer, rightSerializer)
+  override def snapshotConfiguration(): ScalaEitherSerializerSnapshot[A, B] = {
+    new ScalaEitherSerializerSnapshot[A, B](leftSerializer, rightSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Either[A, B]] = {
 
     configSnapshot match {
-      case eitherSerializerConfig: ScalaEitherSerializerConfigSnapshot[A, B] =>
-        checkCompatibility(eitherSerializerConfig)
-
       // backwards compatibility path;
       // Flink versions older or equal to 1.5.x uses a
       // EitherSerializerConfigSnapshot as the snapshot
diff --git a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
new file mode 100644
index 00000000000..9cd8b5d55b5
--- /dev/null
+++ b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshotMigrationTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+import scala.util.Either;
+
+/**
+ * Migration test for the {@link ScalaEitherSerializerSnapshot}.
+ */
+public class ScalaEitherSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Either<Integer, String>> {
+
+	private static final String DATA = "flink-1.6-scala-either-serializer-data";
+	private static final String SNAPSHOT = "flink-1.6-scala-either-serializer-snapshot";
+
+	public ScalaEitherSerializerSnapshotMigrationTest() {
+		super(
+			TestSpecification.<Either<Integer, String>>builder(
+					"1.6-scala-either-serializer",
+					EitherSerializer.class,
+					ScalaEitherSerializerSnapshot.class)
+				.withSerializerProvider(() -> new EitherSerializer<>(IntSerializer.INSTANCE, StringSerializer.INSTANCE))
+				.withSnapshotDataLocation(SNAPSHOT)
+				.withTestData(DATA, 10)
+		);
+	}
+
+}
diff --git a/flink-scala/src/test/resources/flink-1.6-scala-either-serializer-data b/flink-scala/src/test/resources/flink-1.6-scala-either-serializer-data
new file mode 100644
index 00000000000..203067c5a04
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.6-scala-either-serializer-data differ
diff --git a/flink-scala/src/test/resources/flink-1.6-scala-either-serializer-snapshot b/flink-scala/src/test/resources/flink-1.6-scala-either-serializer-snapshot
new file mode 100644
index 00000000000..12394060496
Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.6-scala-either-serializer-snapshot differ


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services