You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:13 UTC

[flink] branch master updated (31685a3 -> 5bcaf2e)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 31685a3  [FLINK-11280] [state backends, tests] Do not set current key before restore in TtlStateTestBase tests
     new beee5b1  [FLINK-11073] [core] Introduce CompositeTypeSerializerSnapshot
     new 8174854  [FLINK-11079] [core] Let ListSerializerSnapshot extend CompositeTypeSerializerSnapshot
     new 9822cc8  [FLINK-11073] [core] Let MapViewSerializerSnapshot be a CompositeTypeSerializerSnapshot
     new 383cc9e  [FLINK-11073] [core] Let ArrayListSerializerSnapshot be a CompositeTypeSerializerSnapshot
     new eda7376  [FLINK-11073] [core] Let LockableTypeSerializerSnapshot be a CompositeTypeSerializerSnapshot
     new faf093d  [FLINK-11073] [core] Let ListViewSerializerSnapshot be a CompositeTypeSerializerSnapshot
     new 5e6664d  [FLINK-11073] [core] Let MapSerializerSnapshot be a CompositeTypeSerializerSnapshot
     new 68fab12  [FLINK-11073] [core] Let ScalaEitherSerializerSnapshot be a CompositeTypeSerializerSnapshot
     new e791b1a  [FLINK-11073] [core] Replace GenericArraySerializerConfigSnapshot with new GenericArraySerializerSnapshot
     new 301be55  [FLINK-11073] [core] Replace EitherSerializerSnapshot with new JavaEitherSerializerSnapshot
     new 25dc483  [hotfix] [core] Assert current serializer snapshot is expected snapshot class in TypeSerializerSnapshotMigrationTestBase
     new 5bcaf2e  [FLINK-11073] [core] Rename CompositeSerializerSnapshot to NestedSerializersSnapshotDelegate

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../typeutils/CompositeTypeSerializerSnapshot.java | 346 +++++++++++++
 ...java => NestedSerializersSnapshotDelegate.java} |  47 +-
 .../base/CollectionSerializerConfigSnapshot.java   |   9 +-
 .../typeutils/base/GenericArraySerializer.java     |   4 +-
 .../base/GenericArraySerializerConfigSnapshot.java |  34 +-
 .../base/GenericArraySerializerSnapshot.java       |  81 +++
 .../api/common/typeutils/base/ListSerializer.java  |   2 +-
 .../typeutils/base/ListSerializerSnapshot.java     |  56 +--
 .../api/common/typeutils/base/MapSerializer.java   |   2 +-
 .../base/MapSerializerConfigSnapshot.java          |   3 +-
 .../typeutils/base/MapSerializerSnapshot.java      |  60 +--
 .../java/typeutils/runtime/EitherSerializer.java   |   4 +-
 .../runtime/EitherSerializerSnapshot.java          |  28 +-
 .../runtime/JavaEitherSerializerSnapshot.java      |  61 +++
 ...mpositeTypeSerializerSnapshotMigrationTest.java |   8 +-
 .../CompositeTypeSerializerSnapshotTest.java       | 544 +++++++++++++++++++++
 .../TypeSerializerSnapshotMigrationTestBase.java   |   6 +
 .../flink/cep/nfa/sharedbuffer/Lockable.java       |   2 +-
 .../LockableTypeSerializerSnapshot.java            |  56 +--
 .../table/dataview/ListViewSerializerSnapshot.java |  54 +-
 .../table/dataview/MapViewSerializerSnapshot.java  |  55 +--
 .../flink/table/dataview/ListViewSerializer.scala  |   2 +-
 .../flink/table/dataview/MapViewSerializer.scala   |   2 +-
 .../flink/runtime/state/ArrayListSerializer.java   |   2 +-
 .../runtime/state/ArrayListSerializerSnapshot.java |  54 +-
 .../typeutils/ScalaEitherSerializerSnapshot.java   |  62 +--
 .../api/scala/typeutils/EitherSerializer.scala     |   2 +-
 27 files changed, 1223 insertions(+), 363 deletions(-)
 create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
 rename flink-core/src/main/java/org/apache/flink/api/common/typeutils/{CompositeSerializerSnapshot.java => NestedSerializersSnapshotDelegate.java} (79%)
 create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
 create mode 100644 flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
 create mode 100644 flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java


[flink] 03/12: [FLINK-11073] [core] Let MapViewSerializerSnapshot be a CompositeTypeSerializerSnapshot

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9822cc8a73cc4a54cb401d3ee31ce2e8894a6d6d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:11:11 2018 +0800

    [FLINK-11073] [core] Let MapViewSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
 .../table/dataview/MapViewSerializerSnapshot.java  | 55 ++++++----------------
 .../flink/table/dataview/MapViewSerializer.scala   |  2 +-
 2 files changed, 15 insertions(+), 42 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
index f59fc0a..132f42f 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
@@ -18,78 +18,51 @@
 
 package org.apache.flink.table.dataview;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.table.api.dataview.MapView;
-import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.Map;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * A {@link TypeSerializerSnapshot} for the {@link MapViewSerializer}.
  *
  * @param <K> the key type of the map entries.
  * @param <V> the value type of the map entries.
  */
-public class MapViewSerializerSnapshot<K, V> implements TypeSerializerSnapshot<MapView<K, V>> {
+public class MapViewSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<MapView<K, V>, MapViewSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedMapSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public MapViewSerializerSnapshot() {}
+	public MapViewSerializerSnapshot() {
+		super(MapViewSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public MapViewSerializerSnapshot(TypeSerializer<Map<K, V>> mapSerializer) {
-		this.nestedMapSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(mapSerializer));
+	public MapViewSerializerSnapshot(MapViewSerializer<K, V> mapViewSerializer) {
+		super(mapViewSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<MapView<K, V>> restoreSerializer() {
-		return new MapViewSerializer<>(nestedMapSerializerSnapshot.getRestoreSerializer(0));
-	}
-
-	@Override
-	public TypeSerializerSchemaCompatibility<MapView<K, V>> resolveSchemaCompatibility(
-			TypeSerializer<MapView<K, V>> newSerializer) {
-		checkState(nestedMapSerializerSnapshot != null);
-
-		if (newSerializer instanceof MapViewSerializer) {
-			MapViewSerializer<K, V> serializer = (MapViewSerializer<K, V>) newSerializer;
-
-			return nestedMapSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getMapSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedMapSerializerSnapshot.writeCompositeSnapshot(out);
+	protected MapViewSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<Map<K, V>> mapSerializer = (TypeSerializer<Map<K, V>>) nestedSerializers[0];
+		return new MapViewSerializer<>(mapSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedMapSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(MapViewSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getMapSerializer() };
 	}
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
index 89cdf70..e0067c5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
@@ -78,7 +78,7 @@ class MapViewSerializer[K, V](val mapSerializer: TypeSerializer[java.util.Map[K,
     mapSerializer.equals(obj.asInstanceOf[MapViewSerializer[_, _]].mapSerializer)
 
   override def snapshotConfiguration(): MapViewSerializerSnapshot[K, V] =
-    new MapViewSerializerSnapshot[K, V](mapSerializer)
+    new MapViewSerializerSnapshot[K, V](this)
 
   // copy and modified from MapSerializer.ensureCompatibility
   override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot[_])


[flink] 06/12: [FLINK-11073] [core] Let ListViewSerializerSnapshot be a CompositeTypeSerializerSnapshot

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit faf093d33284b3a0e9e7b7ba7267b91503b69891
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:26:13 2018 +0800

    [FLINK-11073] [core] Let ListViewSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
 .../table/dataview/ListViewSerializerSnapshot.java | 54 ++++++----------------
 .../flink/table/dataview/ListViewSerializer.scala  |  2 +-
 2 files changed, 15 insertions(+), 41 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
index cca84d2..90468ac 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
@@ -18,76 +18,50 @@
 
 package org.apache.flink.table.dataview;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.table.api.dataview.ListView;
-import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.List;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * A {@link TypeSerializerSnapshot} for the {@link ListViewSerializer}.
  *
  * @param <T> the type of the list elements.
  */
-public final class ListViewSerializerSnapshot<T> implements TypeSerializerSnapshot<ListView<T>> {
+public final class ListViewSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ListView<T>, ListViewSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedListSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public ListViewSerializerSnapshot() {}
+	public ListViewSerializerSnapshot() {
+		super(ListViewSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public ListViewSerializerSnapshot(TypeSerializer<List<T>> listSerializer) {
-		this.nestedListSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(listSerializer));
+	public ListViewSerializerSnapshot(ListViewSerializer<T> listViewSerializer) {
+		super(listViewSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<ListView<T>> restoreSerializer() {
-		return new ListViewSerializer<>(nestedListSerializerSnapshot.getRestoreSerializer(0));
-	}
-
-	@Override
-	public TypeSerializerSchemaCompatibility<ListView<T>> resolveSchemaCompatibility(TypeSerializer<ListView<T>> newSerializer) {
-		checkState(nestedListSerializerSnapshot != null);
-
-		if (newSerializer instanceof ListViewSerializer) {
-			ListViewSerializer<T> serializer = (ListViewSerializer<T>) newSerializer;
-
-			return nestedListSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getListSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedListSerializerSnapshot.writeCompositeSnapshot(out);
+	protected ListViewSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<List<T>> listSerializer = (TypeSerializer<List<T>>) nestedSerializers[0];
+		return new ListViewSerializer<>(listSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedListSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(ListViewSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getListSerializer() };
 	}
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
index 246af6c..2d48c3d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
@@ -77,7 +77,7 @@ class ListViewSerializer[T](val listSerializer: TypeSerializer[java.util.List[T]
     listSerializer.equals(obj.asInstanceOf[ListViewSerializer[_]].listSerializer)
 
   override def snapshotConfiguration(): ListViewSerializerSnapshot[T] =
-    new ListViewSerializerSnapshot[T](listSerializer)
+    new ListViewSerializerSnapshot[T](this)
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[ListView[T]] = {


[flink] 02/12: [FLINK-11079] [core] Let ListSerializerSnapshot extend CompositeTypeSerializerSnapshot

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 817485473005dfd054467eab4c2a275d833af723
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 15:55:23 2018 +0800

    [FLINK-11079] [core] Let ListSerializerSnapshot extend CompositeTypeSerializerSnapshot
---
 .../base/CollectionSerializerConfigSnapshot.java   |  9 ++--
 .../api/common/typeutils/base/ListSerializer.java  |  2 +-
 .../typeutils/base/ListSerializerSnapshot.java     | 56 ++++++----------------
 3 files changed, 21 insertions(+), 46 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
index 377dd4c..762a441 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CollectionSerializerConfigSnapshot.java
@@ -49,10 +49,13 @@ public final class CollectionSerializerConfigSnapshot<C extends Collection<T>, T
 	@Override
 	public TypeSerializerSchemaCompatibility<C> resolveSchemaCompatibility(TypeSerializer<C> newSerializer) {
 		if (newSerializer instanceof ListSerializer) {
-			ListSerializerSnapshot<T> listSerializerSnapshot =
-				new ListSerializerSnapshot<>(((ListSerializer<T>) newSerializer).getElementSerializer());
+			ListSerializer<T> newListSerializer = (ListSerializer<T>) newSerializer;
+			ListSerializerSnapshot<T> listSerializerSnapshot = new ListSerializerSnapshot<>(newListSerializer);
 
-			return listSerializerSnapshot.resolveSchemaCompatibility((ListSerializer) newSerializer);
+			@SuppressWarnings("unchecked")
+			TypeSerializerSchemaCompatibility<C> result = (TypeSerializerSchemaCompatibility<C>)
+				listSerializerSnapshot.resolveSchemaCompatibility(newListSerializer);
+			return result;
 		} else {
 			return super.resolveSchemaCompatibility(newSerializer);
 		}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index 08b3333..5178031 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -175,6 +175,6 @@ public final class ListSerializer<T> extends TypeSerializer<List<T>> {
 
 	@Override
 	public TypeSerializerSnapshot<List<T>> snapshotConfiguration() {
-		return new ListSerializerSnapshot<>(elementSerializer);
+		return new ListSerializerSnapshot<>(this);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
index 5f89d94..f90e22a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializerSnapshot.java
@@ -18,73 +18,45 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
 import java.util.List;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Snapshot class for the {@link ListSerializer}.
  */
-public class ListSerializerSnapshot<T> implements TypeSerializerSnapshot<List<T>> {
+public class ListSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<List<T>, ListSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public ListSerializerSnapshot() {}
+	public ListSerializerSnapshot() {
+		super(ListSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public ListSerializerSnapshot(TypeSerializer<T> elementSerializer) {
-		this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(elementSerializer));
+	public ListSerializerSnapshot(ListSerializer<T> listSerializer) {
+		super(listSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<List<T>> restoreSerializer() {
-		return new ListSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
-	}
-
-	@Override
-	public TypeSerializerSchemaCompatibility<List<T>> resolveSchemaCompatibility(TypeSerializer<List<T>> newSerializer) {
-		checkState(nestedElementSerializerSnapshot != null);
-
-		if (newSerializer instanceof ListSerializer) {
-			ListSerializer<T> serializer = (ListSerializer<T>) newSerializer;
-
-			return nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getElementSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
+	protected ListSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<T> elementSerializer = (TypeSerializer<T>) nestedSerializers[0];
+		return new ListSerializer<>(elementSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedElementSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(ListSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() };
 	}
 }


[flink] 11/12: [hotfix] [core] Assert current serializer snapshot is expected snapshot class in TypeSerializerSnapshotMigrationTestBase

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 25dc4832324bb0be0731747c5856441a6b984a95
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Dec 6 19:14:17 2018 +0800

    [hotfix] [core] Assert current serializer snapshot is expected snapshot class in TypeSerializerSnapshotMigrationTestBase
    
    This strengthens the test base to make use of the provided snapshot
    class in the test specifications, to assert that serializers are really
    using the expected classes for their current serializer snapshots.
---
 .../common/typeutils/TypeSerializerSnapshotMigrationTestBase.java   | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
index 186f504..ea18309 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
@@ -103,6 +103,8 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 		TypeSerializer<ElementT> restoredSerializer = previousSnapshot.restoreSerializer();
 
 		TypeSerializerSnapshot<ElementT> nextSnapshot = restoredSerializer.snapshotConfiguration();
+		assertThat(nextSnapshot, instanceOf(testSpecification.snapshotClass));
+
 		TypeSerializerSnapshot<ElementT> nextSnapshotDeserialized = writeAndThenReadTheSnapshot(restoredSerializer, nextSnapshot);
 
 		assertThat(nextSnapshotDeserialized, allOf(
@@ -247,6 +249,10 @@ public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends
 			return resourcePath(this.snapshotDataLocation);
 		}
 
+		public Class<? extends TypeSerializerSnapshot<T>> getSnapshotClass() {
+			return snapshotClass;
+		}
+
 		@Override
 		public String toString() {
 			return String.format("%s , %s, %s", name, serializerType.getSimpleName(), snapshotClass.getSimpleName());


[flink] 12/12: [FLINK-11073] [core] Rename CompositeSerializerSnapshot to NestedSerializersSnapshotDelegate

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5bcaf2e0fdf9c05f257eeb67d67bf1320faeea8f
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Jan 7 13:28:19 2019 +0100

    [FLINK-11073] [core] Rename CompositeSerializerSnapshot to NestedSerializersSnapshotDelegate
    
    After introducing CompositeTypeSerializerSnapshot, the
    CompositeSerializerSnapshot has been reworked to only deal with concerns
    of delegating reading and writing of the nested serializers' snapshots.
    It no longer deals with resolving the final compatibility result for the
    outer composite serializer.
    
    Therefore, it is renamed properly as NestedSerializersSnapshotDelegate,
    and also annotated as an internal class, since we want users to use the
    more powerful CompositeTypeSerializerSnapshot instead.
    
    This closes #7422.
---
 .../typeutils/CompositeTypeSerializerSnapshot.java | 12 +++----
 ...java => NestedSerializersSnapshotDelegate.java} | 37 ++++++++++++----------
 .../base/GenericArraySerializerConfigSnapshot.java | 14 ++++----
 .../runtime/EitherSerializerSnapshot.java          | 16 +++++-----
 4 files changed, 41 insertions(+), 38 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
index 71c0836..c73e24c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -99,7 +99,7 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 
 	private static final int HIGHEST_LEGACY_READ_VERSION = 2;
 
-	private CompositeSerializerSnapshot compositeSerializerSnapshot;
+	private NestedSerializersSnapshotDelegate nestedSerializersSnapshotDelegate;
 
 	private final Class<S> correspondingSerializerClass;
 
@@ -120,7 +120,7 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 	@SuppressWarnings("unchecked")
 	public CompositeTypeSerializerSnapshot(S serializerInstance) {
 		Preconditions.checkNotNull(serializerInstance);
-		this.compositeSerializerSnapshot = new CompositeSerializerSnapshot(getNestedSerializers(serializerInstance));
+		this.nestedSerializersSnapshotDelegate = new NestedSerializersSnapshotDelegate(getNestedSerializers(serializerInstance));
 		this.correspondingSerializerClass = (Class<S>) serializerInstance.getClass();
 	}
 
@@ -132,7 +132,7 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 	@Override
 	public final void writeSnapshot(DataOutputView out) throws IOException {
 		internalWriteOuterSnapshot(out);
-		compositeSerializerSnapshot.writeCompositeSnapshot(out);
+		nestedSerializersSnapshotDelegate.writeNestedSerializerSnapshots(out);
 	}
 
 	@Override
@@ -142,7 +142,7 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 		} else {
 			legacyInternalReadOuterSnapshot(readVersion, in, userCodeClassLoader);
 		}
-		this.compositeSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+		this.nestedSerializersSnapshotDelegate = NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, userCodeClassLoader);
 	}
 
 	@Override
@@ -161,14 +161,14 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize
 		// since outer configuration is compatible, the final compatibility result depends only on the nested serializers
 		return constructFinalSchemaCompatibilityResult(
 			getNestedSerializers(castedNewSerializer),
-			compositeSerializerSnapshot.getNestedSerializerSnapshots());
+			nestedSerializersSnapshotDelegate.getNestedSerializerSnapshots());
 	}
 
 	@Override
 	public final TypeSerializer<T> restoreSerializer() {
 		@SuppressWarnings("unchecked")
 		TypeSerializer<T> serializer = (TypeSerializer<T>)
-			createOuterSerializerWithNestedSerializers(compositeSerializerSnapshot.getRestoreSerializers());
+			createOuterSerializerWithNestedSerializers(nestedSerializersSnapshotDelegate.getRestoredNestedSerializers());
 
 		return serializer;
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/NestedSerializersSnapshotDelegate.java
similarity index 81%
rename from flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
rename to flink-core/src/main/java/org/apache/flink/api/common/typeutils/NestedSerializersSnapshotDelegate.java
index 5b3f775..a4dcdd2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/NestedSerializersSnapshotDelegate.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -30,21 +30,21 @@ import java.util.List;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * A CompositeSerializerSnapshot represents the snapshots of multiple serializers that are used
+ * A NestedSerializersSnapshotDelegate represents the snapshots of multiple serializers that are used
  * by an outer serializer. Examples would be tuples, where the outer serializer is the tuple
- * format serializer, an the CompositeSerializerSnapshot holds the serializers for the
+ * format serializer, and the NestedSerializersSnapshotDelegate holds the serializers for the
  * different tuple fields.
  *
- * <p>The CompositeSerializerSnapshot does not implement the {@link TypeSerializerSnapshot} interface.
+ * <p>The NestedSerializersSnapshotDelegate does not implement the {@link TypeSerializerSnapshot} interface.
  * It is not meant to be inherited from, but to be composed with a serializer snapshot implementation.
  *
- * <p>The CompositeSerializerSnapshot has its own versioning internally, it does not couple its
+ * <p>The NestedSerializersSnapshotDelegate has its own versioning internally, it does not couple its
  * versioning to the versioning of the TypeSerializerSnapshot that builds on top of this class.
- * That way, the CompositeSerializerSnapshot and enclosing TypeSerializerSnapshot the can evolve
+ * That way, the NestedSerializersSnapshotDelegate and enclosing TypeSerializerSnapshot the can evolve
  * their formats independently.
  */
-@PublicEvolving
-public class CompositeSerializerSnapshot {
+@Internal
+public class NestedSerializersSnapshotDelegate {
 
 	/** Magic number for integrity checks during deserialization. */
 	private static final int MAGIC_NUMBER = 1333245;
@@ -58,14 +58,14 @@ public class CompositeSerializerSnapshot {
 	/**
 	 * Constructor to create a snapshot for writing.
 	 */
-	public CompositeSerializerSnapshot(TypeSerializer<?>... serializers) {
+	public NestedSerializersSnapshotDelegate(TypeSerializer<?>... serializers) {
 		this.nestedSnapshots = TypeSerializerUtils.snapshotBackwardsCompatible(serializers);
 	}
 
 	/**
 	 * Constructor to create a snapshot during deserialization.
 	 */
-	private CompositeSerializerSnapshot(TypeSerializerSnapshot<?>[] snapshots) {
+	private NestedSerializersSnapshotDelegate(TypeSerializerSnapshot<?>[] snapshots) {
 		this.nestedSnapshots = snapshots;
 	}
 
@@ -77,14 +77,14 @@ public class CompositeSerializerSnapshot {
 	 * Produces a restore serializer from each contained serializer configuration snapshot.
 	 * The serializers are returned in the same order as the snapshots are stored.
 	 */
-	public TypeSerializer<?>[] getRestoreSerializers() {
+	public TypeSerializer<?>[] getRestoredNestedSerializers() {
 		return snapshotsToRestoreSerializers(nestedSnapshots);
 	}
 
 	/**
 	 * Creates the restore serializer from the pos-th config snapshot.
 	 */
-	public <T> TypeSerializer<T> getRestoreSerializer(int pos) {
+	public <T> TypeSerializer<T> getRestoredNestedSerializer(int pos) {
 		checkArgument(pos < nestedSnapshots.length);
 
 		@SuppressWarnings("unchecked")
@@ -105,6 +105,9 @@ public class CompositeSerializerSnapshot {
 	/**
 	 * Resolves the compatibility of the nested serializer snapshots with the nested
 	 * serializers of the new outer serializer.
+	 *
+	 * @deprecated this no method will be removed in the future. Resolving compatibility for nested
+	 *             serializers is now handled by {@link CompositeTypeSerializerSnapshot}.
 	 */
 	@Deprecated
 	public <T> TypeSerializerSchemaCompatibility<T> resolveCompatibilityWithNested(
@@ -145,7 +148,7 @@ public class CompositeSerializerSnapshot {
 	/**
 	 * Writes the composite snapshot of all the contained serializers.
 	 */
-	public final void writeCompositeSnapshot(DataOutputView out) throws IOException {
+	public final void writeNestedSerializerSnapshots(DataOutputView out) throws IOException {
 		out.writeInt(MAGIC_NUMBER);
 		out.writeInt(VERSION);
 
@@ -158,7 +161,7 @@ public class CompositeSerializerSnapshot {
 	/**
 	 * Reads the composite snapshot of all the contained serializers.
 	 */
-	public static CompositeSerializerSnapshot readCompositeSnapshot(DataInputView in, ClassLoader cl) throws IOException {
+	public static NestedSerializersSnapshotDelegate readNestedSerializerSnapshots(DataInputView in, ClassLoader cl) throws IOException {
 		final int magicNumber = in.readInt();
 		if (magicNumber != MAGIC_NUMBER) {
 			throw new IOException(String.format("Corrupt data, magic number mismatch. Expected %8x, found %8x",
@@ -177,14 +180,14 @@ public class CompositeSerializerSnapshot {
 			nestedSnapshots[i] = TypeSerializerSnapshot.readVersionedSnapshot(in, cl);
 		}
 
-		return new CompositeSerializerSnapshot(nestedSnapshots);
+		return new NestedSerializersSnapshotDelegate(nestedSnapshots);
 	}
 
 	/**
 	 * Reads the composite snapshot of all the contained serializers in a way that is compatible
 	 * with Version 1 of the deprecated {@link CompositeTypeSerializerConfigSnapshot}.
 	 */
-	public static CompositeSerializerSnapshot legacyReadProductSnapshots(DataInputView in, ClassLoader cl) throws IOException {
+	public static NestedSerializersSnapshotDelegate legacyReadNestedSerializerSnapshots(DataInputView in, ClassLoader cl) throws IOException {
 		@SuppressWarnings("deprecation")
 		final List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndSnapshots =
 				TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, cl);
@@ -193,7 +196,7 @@ public class CompositeSerializerSnapshot {
 				.map(t -> t.f1)
 				.toArray(TypeSerializerSnapshot<?>[]::new);
 
-		return new CompositeSerializerSnapshot(nestedSnapshots);
+		return new NestedSerializersSnapshotDelegate(nestedSnapshots);
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index 8cbe76c..cfc2e98 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -54,7 +54,7 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 
 	/** Snapshot handling for the component serializer snapshot. */
 	@Nullable
-	private CompositeSerializerSnapshot nestedSnapshot;
+	private NestedSerializersSnapshotDelegate nestedSnapshot;
 
 	/**
 	 * Constructor for read instantiation.
@@ -67,7 +67,7 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 	 */
 	public GenericArraySerializerConfigSnapshot(GenericArraySerializer<C> serializer) {
 		this.componentClass = serializer.getComponentClass();
-		this.nestedSnapshot = new CompositeSerializerSnapshot(serializer.getComponentSerializer());
+		this.nestedSnapshot = new NestedSerializersSnapshotDelegate(serializer.getComponentSerializer());
 	}
 
 	// ------------------------------------------------------------------------
@@ -81,7 +81,7 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 	public void writeSnapshot(DataOutputView out) throws IOException {
 		checkState(componentClass != null && nestedSnapshot != null);
 		out.writeUTF(componentClass.getName());
-		nestedSnapshot.writeCompositeSnapshot(out);
+		nestedSnapshot.writeNestedSerializerSnapshots(out);
 	}
 
 	@Override
@@ -99,7 +99,7 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 	}
 
 	private void readV1(DataInputView in, ClassLoader classLoader) throws IOException {
-		nestedSnapshot = CompositeSerializerSnapshot.legacyReadProductSnapshots(in, classLoader);
+		nestedSnapshot = NestedSerializersSnapshotDelegate.legacyReadNestedSerializerSnapshots(in, classLoader);
 
 		try (DataInputViewStream inViewWrapper = new DataInputViewStream(in)) {
 			componentClass = InstantiationUtil.deserializeObject(inViewWrapper, classLoader);
@@ -111,13 +111,13 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 
 	private void readV2(DataInputView in, ClassLoader classLoader) throws IOException {
 		componentClass = InstantiationUtil.resolveClassByName(in, classLoader);
-		nestedSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, classLoader);
+		nestedSnapshot = NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, classLoader);
 	}
 
 	@Override
 	public TypeSerializer<C[]> restoreSerializer() {
 		checkState(componentClass != null && nestedSnapshot != null);
-		return new GenericArraySerializer<>(componentClass, nestedSnapshot.getRestoreSerializer(0));
+		return new GenericArraySerializer<>(componentClass, nestedSnapshot.getRestoredNestedSerializer(0));
 	}
 
 	@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
index 3b7a8e7..1779ec8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -47,7 +47,7 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
 
 	/** Snapshot handling for the component serializer snapshot. */
 	@Nullable
-	private CompositeSerializerSnapshot nestedSnapshot;
+	private NestedSerializersSnapshotDelegate nestedSnapshot;
 
 	/**
 	 * Constructor for read instantiation.
@@ -62,7 +62,7 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
 			TypeSerializer<L> leftSerializer,
 			TypeSerializer<R> rightSerializer) {
 
-		this.nestedSnapshot = new CompositeSerializerSnapshot(leftSerializer, rightSerializer);
+		this.nestedSnapshot = new NestedSerializersSnapshotDelegate(leftSerializer, rightSerializer);
 	}
 
 	// ------------------------------------------------------------------------
@@ -75,7 +75,7 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
 	@Override
 	public void writeSnapshot(DataOutputView out) throws IOException {
 		checkState(nestedSnapshot != null);
-		nestedSnapshot.writeCompositeSnapshot(out);
+		nestedSnapshot.writeNestedSerializerSnapshots(out);
 	}
 
 	@Override
@@ -93,19 +93,19 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
 	}
 
 	private void readV1(DataInputView in, ClassLoader classLoader) throws IOException {
-		nestedSnapshot = CompositeSerializerSnapshot.legacyReadProductSnapshots(in, classLoader);
+		nestedSnapshot = NestedSerializersSnapshotDelegate.legacyReadNestedSerializerSnapshots(in, classLoader);
 	}
 
 	private void readV2(DataInputView in, ClassLoader classLoader) throws IOException {
-		nestedSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, classLoader);
+		nestedSnapshot = NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, classLoader);
 	}
 
 	@Override
 	public TypeSerializer<Either<L, R>> restoreSerializer() {
 		checkState(nestedSnapshot != null);
 		return new EitherSerializer<>(
-				nestedSnapshot.getRestoreSerializer(0),
-				nestedSnapshot.getRestoreSerializer(1));
+				nestedSnapshot.getRestoredNestedSerializer(0),
+				nestedSnapshot.getRestoredNestedSerializer(1));
 	}
 
 	@Override


[flink] 01/12: [FLINK-11073] [core] Introduce CompositeTypeSerializerSnapshot

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit beee5b168875fc54bc09780942c4fc724ffccbf0
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 15:51:12 2018 +0800

    [FLINK-11073] [core] Introduce CompositeTypeSerializerSnapshot
    
    The CompositeTypeSerializerSnapshot encapsulates logic for handling
    writing, reading, and deriving final compatibility results for composite
    serializers that have multiple nested serializers as well as some
    static outer configuration (e.g. type class in the
    GenericArraySerializer).
    
    his base class has its own versioning for the format in which it writes
    the outer snapshot and the nested serializer snapshots. The version of
    the serialization format of this based class is defined
    by getCurrentVersion(). This is independent of the version in which
    subclasses writes their outer snapshot, defined by
    getCurrentOuterSnapshotVersion().
    
    This means that the outer snapshot's version can be maintained only
    taking into account changes in how the outer snapshot is written.
    Any changes in the base format does not require upticks in the outer
    snapshot's version
---
 .../typeutils/CompositeSerializerSnapshot.java     |  10 +
 .../typeutils/CompositeTypeSerializerSnapshot.java | 346 +++++++++++++
 .../CompositeTypeSerializerSnapshotTest.java       | 544 +++++++++++++++++++++
 3 files changed, 900 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
index 93f5a70..5b3f775 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializerSnapshot.java
@@ -94,9 +94,19 @@ public class CompositeSerializerSnapshot {
 	}
 
 	/**
+	 * Returns the snapshots of the nested serializers.
+	 *
+	 * @return the snapshots of the nested serializers.
+	 */
+	public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
+		return nestedSnapshots;
+	}
+
+	/**
 	 * Resolves the compatibility of the nested serializer snapshots with the nested
 	 * serializers of the new outer serializer.
 	 */
+	@Deprecated
 	public <T> TypeSerializerSchemaCompatibility<T> resolveCompatibilityWithNested(
 			TypeSerializerSchemaCompatibility<?> outerCompatibility,
 			TypeSerializer<?>... newNestedSerializers) {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
new file mode 100644
index 0000000..71c0836
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A {@link CompositeTypeSerializerSnapshot} is a convenient serializer snapshot class that can be used by
+ * simple serializers which 1) delegates its serialization to multiple nested serializers, and 2) may contain
+ * some extra static information that needs to be persisted as part of its snapshot.
+ *
+ * <p>Examples for this would be the {@link ListSerializer}, {@link MapSerializer}, {@link EitherSerializer}, etc.,
+ * in which case the serializer, called the "outer" serializer in this context, has only some nested serializers that
+ * needs to be persisted as its snapshot, and nothing else that needs to be persisted as the "outer" snapshot.
+ * An example which has non-empty outer snapshots would be the {@link GenericArraySerializer}, which beyond the
+ * nested component serializer, also contains a class of the component type that needs to be persisted.
+ *
+ * <p>Serializers that do have some outer snapshot needs to make sure to implement the methods
+ * {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, and
+ * {@link #isOuterSnapshotCompatible(TypeSerializer)} when using this class as the base for its serializer snapshot
+ * class. By default, the base implementations of these methods are empty, i.e. this class assumes that
+ * subclasses do not have any outer snapshot that needs to be persisted.
+ *
+ * <h2>Snapshot Versioning</h2>
+ *
+ * <p>This base class has its own versioning for the format in which it writes the outer snapshot and the
+ * nested serializer snapshots. The version of the serialization format of this based class is defined
+ * by {@link #getCurrentVersion()}. This is independent of the version in which subclasses writes their outer snapshot,
+ * defined by {@link #getCurrentOuterSnapshotVersion()}.
+ * This means that the outer snapshot's version can be maintained only taking into account changes in how the
+ * outer snapshot is written. Any changes in the base format does not require upticks in the outer snapshot's version.
+ *
+ * <h2>Serialization Format</hr>
+ *
+ * <p>The current version of the serialization format of a {@link CompositeTypeSerializerSnapshot} is as follows:
+ *
+ * <pre>{@code
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | CompositeTypeSerializerSnapshot | CompositeTypeSerializerSnapshot |          Outer snapshot         |
+ * |           version               |          MAGIC_NUMBER           |              version            |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                                               Outer snapshot                                        |
+ * |                                   #writeOuterSnapshot(DataOutputView out)                           |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |      Delegate MAGIC_NUMBER      |         Delegate version        |     Num. nested serializers     |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                                     Nested serializer snapshots                                     |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * }</pre>
+ *
+ * @param <T> The data type that the originating serializer of this snapshot serializes.
+ * @param <S> The type of the originating serializer.
+ */
+@PublicEvolving
+public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerializer> implements TypeSerializerSnapshot<T> {
+
+	/** Magic number for integrity checks during deserialization. */
+	private static final int MAGIC_NUMBER = 911108;
+
+	/**
+	 * Current version of the base serialization format.
+	 *
+	 * <p>NOTE: We start from version 3. This version is represented by the {@link #getCurrentVersion()} method.
+	 * Previously, this method was used to represent the outer snapshot's version (now, represented
+	 * by the {@link #getCurrentOuterSnapshotVersion()} method).
+	 *
+	 * <p>To bridge this transition, we set the starting version of the base format to be at least
+	 * larger than the highest version of previously defined values in implementing subclasses,
+	 * which was {@link #HIGHEST_LEGACY_READ_VERSION}. This allows us to identify legacy deserialization paths,
+	 * which did not contain versioning for the base format, simply by checking if the read
+	 * version of the snapshot is smaller than or equal to {@link #HIGHEST_LEGACY_READ_VERSION}.
+	 */
+	private static final int VERSION = 3;
+
+	private static final int HIGHEST_LEGACY_READ_VERSION = 2;
+
+	private CompositeSerializerSnapshot compositeSerializerSnapshot;
+
+	private final Class<S> correspondingSerializerClass;
+
+	/**
+	 * Constructor to be used for read instantiation.
+	 *
+	 * @param correspondingSerializerClass the expected class of the new serializer.
+	 */
+	public CompositeTypeSerializerSnapshot(Class<S> correspondingSerializerClass) {
+		this.correspondingSerializerClass = Preconditions.checkNotNull(correspondingSerializerClass);
+	}
+
+	/**
+	 * Constructor to be used for writing the snapshot.
+	 *
+	 * @param serializerInstance an instance of the originating serializer of this snapshot.
+	 */
+	@SuppressWarnings("unchecked")
+	public CompositeTypeSerializerSnapshot(S serializerInstance) {
+		Preconditions.checkNotNull(serializerInstance);
+		this.compositeSerializerSnapshot = new CompositeSerializerSnapshot(getNestedSerializers(serializerInstance));
+		this.correspondingSerializerClass = (Class<S>) serializerInstance.getClass();
+	}
+
+	@Override
+	public final int getCurrentVersion() {
+		return VERSION;
+	}
+
+	@Override
+	public final void writeSnapshot(DataOutputView out) throws IOException {
+		internalWriteOuterSnapshot(out);
+		compositeSerializerSnapshot.writeCompositeSnapshot(out);
+	}
+
+	@Override
+	public final void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		if (readVersion > HIGHEST_LEGACY_READ_VERSION) {
+			internalReadOuterSnapshot(in, userCodeClassLoader);
+		} else {
+			legacyInternalReadOuterSnapshot(readVersion, in, userCodeClassLoader);
+		}
+		this.compositeSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	}
+
+	@Override
+	public final TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
+		if (newSerializer.getClass() != correspondingSerializerClass) {
+			return TypeSerializerSchemaCompatibility.incompatible();
+		}
+
+		S castedNewSerializer = correspondingSerializerClass.cast(newSerializer);
+
+		// check that outer configuration is compatible; if not, short circuit result
+		if (!isOuterSnapshotCompatible(castedNewSerializer)) {
+			return TypeSerializerSchemaCompatibility.incompatible();
+		}
+
+		// since outer configuration is compatible, the final compatibility result depends only on the nested serializers
+		return constructFinalSchemaCompatibilityResult(
+			getNestedSerializers(castedNewSerializer),
+			compositeSerializerSnapshot.getNestedSerializerSnapshots());
+	}
+
+	@Override
+	public final TypeSerializer<T> restoreSerializer() {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<T> serializer = (TypeSerializer<T>)
+			createOuterSerializerWithNestedSerializers(compositeSerializerSnapshot.getRestoreSerializers());
+
+		return serializer;
+	}
+
+	// ------------------------------------------------------------------------------------------
+	//  Outer serializer access methods
+	// ------------------------------------------------------------------------------------------
+
+	/**
+	 * Returns the version of the current outer snapshot's written binary format.
+	 *
+	 * @return the version of the current outer snapshot's written binary format.
+	 */
+	protected abstract int getCurrentOuterSnapshotVersion();
+
+	/**
+	 * Gets the nested serializers from the outer serializer.
+	 *
+	 * @param outerSerializer the outer serializer.
+	 *
+	 * @return the nested serializers.
+	 */
+	protected abstract TypeSerializer<?>[] getNestedSerializers(S outerSerializer);
+
+	/**
+	 * Creates an instance of the outer serializer with a given array of its nested serializers.
+	 *
+	 * @param nestedSerializers array of nested serializers to create the outer serializer with.
+	 *
+	 * @return an instance of the outer serializer.
+	 */
+	protected abstract S createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers);
+
+	// ------------------------------------------------------------------------------------------
+	//  Outer snapshot methods; need to be overridden if outer snapshot is not empty,
+	//  or in other words, the outer serializer has extra configuration beyond its nested serializers.
+	// ------------------------------------------------------------------------------------------
+
+	/**
+	 * Writes the outer snapshot, i.e. any information beyond the nested serializers of the outer serializer.
+	 *
+	 * <p>The base implementation of this methods writes nothing, i.e. it assumes that the outer serializer
+	 * only has nested serializers and no extra information. Otherwise, if the outer serializer contains
+	 * some extra information that needs to be persisted as part of the serializer snapshot, this
+	 * must be overridden. Note that this method and the corresponding methods
+	 * {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, {@link #isOuterSnapshotCompatible(TypeSerializer)}
+	 * needs to be implemented.
+	 *
+	 * @param out the {@link DataOutputView} to write the outer snapshot to.
+	 */
+	protected void writeOuterSnapshot(DataOutputView out) throws IOException {}
+
+	/**
+	 * Reads the outer snapshot, i.e. any information beyond the nested serializers of the outer serializer.
+	 *
+	 * <p>The base implementation of this methods reads nothing, i.e. it assumes that the outer serializer
+	 * only has nested serializers and no extra information. Otherwise, if the outer serializer contains
+	 * some extra information that has been persisted as part of the serializer snapshot, this
+	 * must be overridden. Note that this method and the corresponding methods
+	 * {@link #writeOuterSnapshot(DataOutputView)}, {@link #isOuterSnapshotCompatible(TypeSerializer)}
+	 * needs to be implemented.
+	 *
+	 * @param readOuterSnapshotVersion the read version of the outer snapshot.
+	 * @param in the {@link DataInputView} to read the outer snapshot from.
+	 * @param userCodeClassLoader the user code class loader.
+	 */
+	protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {}
+
+	/**
+	 * Checks whether the outer snapshot is compatible with a given new serializer.
+	 *
+	 * <p>The base implementation of this method just returns {@code true}, i.e. it assumes that the outer serializer
+	 * only has nested serializers and no extra information, and therefore the result of the check must always
+	 * be true. Otherwise, if the outer serializer contains
+	 * some extra information that has been persisted as part of the serializer snapshot, this
+	 * must be overridden. Note that this method and the corresponding methods
+	 * {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}
+	 * needs to be implemented.
+	 *
+	 * @param newSerializer the new serializer, which contains the new outer information to check against.
+	 *
+	 * @return a flag indicating whether or not the new serializer's outer information is compatible with the one
+	 *         written in this snapshot.
+	 */
+	protected boolean isOuterSnapshotCompatible(S newSerializer) {
+		return true;
+	}
+
+	// ------------------------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------------------------
+
+	private void internalWriteOuterSnapshot(DataOutputView out) throws IOException {
+		out.writeInt(MAGIC_NUMBER);
+		out.writeInt(getCurrentOuterSnapshotVersion());
+
+		writeOuterSnapshot(out);
+	}
+
+	private void internalReadOuterSnapshot(DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		final int magicNumber = in.readInt();
+		if (magicNumber != MAGIC_NUMBER) {
+			throw new IOException(String.format("Corrupt data, magic number mismatch. Expected %8x, found %8x",
+				MAGIC_NUMBER, magicNumber));
+		}
+
+		final int outerSnapshotVersion = in.readInt();
+		readOuterSnapshot(outerSnapshotVersion, in, userCodeClassLoader);
+	}
+
+	private void legacyInternalReadOuterSnapshot(
+			int legacyReadVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+
+		// legacy versions did not contain the pre-fixed magic numbers; just read the outer snapshot
+		readOuterSnapshot(legacyReadVersion, in, userCodeClassLoader);
+	}
+
+	private TypeSerializerSchemaCompatibility<T> constructFinalSchemaCompatibilityResult(
+			TypeSerializer<?>[] newNestedSerializers,
+			TypeSerializerSnapshot<?>[] nestedSerializerSnapshots) {
+
+		Preconditions.checkArgument(newNestedSerializers.length == nestedSerializerSnapshots.length,
+			"Different number of new serializers and existing serializer snapshots.");
+
+		TypeSerializer<?>[] reconfiguredNestedSerializers = new TypeSerializer[newNestedSerializers.length];
+
+		// check nested serializers for compatibility
+		boolean nestedSerializerRequiresMigration = false;
+		boolean hasReconfiguredNestedSerializers = false;
+		for (int i = 0; i < nestedSerializerSnapshots.length; i++) {
+			TypeSerializerSchemaCompatibility<?> compatibility =
+				resolveCompatibility(newNestedSerializers[i], nestedSerializerSnapshots[i]);
+
+			// if any one of the new nested serializers is incompatible, we can just short circuit the result
+			if (compatibility.isIncompatible()) {
+				return TypeSerializerSchemaCompatibility.incompatible();
+			}
+
+			if (compatibility.isCompatibleAfterMigration()) {
+				nestedSerializerRequiresMigration = true;
+			} else if (compatibility.isCompatibleWithReconfiguredSerializer()) {
+				hasReconfiguredNestedSerializers = true;
+				reconfiguredNestedSerializers[i] = compatibility.getReconfiguredSerializer();
+			} else if (compatibility.isCompatibleAsIs()) {
+				reconfiguredNestedSerializers[i] = newNestedSerializers[i];
+			} else {
+				throw new IllegalStateException("Undefined compatibility type.");
+			}
+		}
+
+		if (nestedSerializerRequiresMigration) {
+			return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+		}
+
+		if (hasReconfiguredNestedSerializers) {
+			@SuppressWarnings("unchecked")
+			TypeSerializer<T> reconfiguredCompositeSerializer = createOuterSerializerWithNestedSerializers(reconfiguredNestedSerializers);
+			return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredCompositeSerializer);
+		}
+
+		// ends up here if everything is compatible as is
+		return TypeSerializerSchemaCompatibility.compatibleAsIs();
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <E> TypeSerializerSchemaCompatibility<E> resolveCompatibility(
+		TypeSerializer<?> serializer,
+		TypeSerializerSnapshot<?> snapshot) {
+
+		TypeSerializer<E> typedSerializer = (TypeSerializer<E>) serializer;
+		TypeSerializerSnapshot<E> typedSnapshot = (TypeSerializerSnapshot<E>) snapshot;
+
+		return typedSnapshot.resolveSchemaCompatibility(typedSerializer);
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
new file mode 100644
index 0000000..0f77e3d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Test suite for the {@link CompositeTypeSerializerSnapshot}.
+ */
+public class CompositeTypeSerializerSnapshotTest {
+
+	// ------------------------------------------------------------------------------------------------
+	//  Scope: tests CompositeTypeSerializerSnapshot#resolveSchemaCompatibility
+	// ------------------------------------------------------------------------------------------------
+
+	@Test
+	public void testIncompatiblePrecedence() throws IOException {
+		final String OUTER_CONFIG = "outer-config";
+		final TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION),
+			new NestedSerializer(TargetCompatibility.INCOMPATIBLE),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER)
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				OUTER_CONFIG,
+				OUTER_CONFIG);
+
+		Assert.assertTrue(compatibility.isIncompatible());
+	}
+
+	@Test
+	public void testCompatibleAfterMigrationPrecedence() throws IOException {
+		final String OUTER_CONFIG = "outer-config";
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				OUTER_CONFIG,
+				OUTER_CONFIG);
+
+		Assert.assertTrue(compatibility.isCompatibleAfterMigration());
+	}
+
+	@Test
+	public void testCompatibleWithReconfiguredSerializerPrecedence() throws IOException {
+		final String OUTER_CONFIG = "outer-config";
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				OUTER_CONFIG,
+				OUTER_CONFIG);
+
+		Assert.assertTrue(compatibility.isCompatibleWithReconfiguredSerializer());
+
+		TestCompositeTypeSerializer reconfiguredSerializer =
+			(TestCompositeTypeSerializer) compatibility.getReconfiguredSerializer();
+		TypeSerializer<?>[] reconfiguredNestedSerializers = reconfiguredSerializer.getNestedSerializers();
+		// nested serializer at index 1 should strictly be a ReconfiguredNestedSerializer
+		Assert.assertTrue(reconfiguredNestedSerializers[0].getClass() == NestedSerializer.class);
+		Assert.assertTrue(reconfiguredNestedSerializers[1].getClass() == ReconfiguredNestedSerializer.class);
+		Assert.assertTrue(reconfiguredNestedSerializers[2].getClass() == NestedSerializer.class);
+	}
+
+	@Test
+	public void testCompatibleAsIsPrecedence() throws IOException {
+		final String OUTER_CONFIG = "outer-config";
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				OUTER_CONFIG,
+				OUTER_CONFIG);
+
+		Assert.assertTrue(compatibility.isCompatibleAsIs());
+	}
+
+	@Test
+	public void testOuterSnapshotCompatibilityPrecedence() throws IOException {
+		final String INIT_OUTER_CONFIG = "outer-config";
+		final String INCOMPAT_OUTER_CONFIG = "incompat-outer-config";
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+		};
+
+		TypeSerializerSchemaCompatibility<String> compatibility =
+			snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+				testNestedSerializers,
+				INIT_OUTER_CONFIG,
+				INCOMPAT_OUTER_CONFIG);
+
+		// even though nested serializers are compatible, incompatibility of the outer
+		// snapshot should have higher precedence in the final result
+		Assert.assertTrue(compatibility.isIncompatible());
+	}
+
+	private TypeSerializerSchemaCompatibility<String> snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+			TypeSerializer<?>[] testNestedSerializers,
+			String initialOuterConfiguration,
+			String newOuterConfiguration) throws IOException {
+		TestCompositeTypeSerializer testSerializer =
+			new TestCompositeTypeSerializer(initialOuterConfiguration, testNestedSerializers);
+
+		TypeSerializerSnapshot<String> testSerializerSnapshot = testSerializer.snapshotConfiguration();
+
+		DataOutputSerializer out = new DataOutputSerializer(128);
+		TypeSerializerSnapshot.writeVersionedSnapshot(out, testSerializerSnapshot);
+
+		DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+		testSerializerSnapshot = TypeSerializerSnapshot.readVersionedSnapshot(
+			in, Thread.currentThread().getContextClassLoader());
+
+		TestCompositeTypeSerializer newTestSerializer =
+			new TestCompositeTypeSerializer(newOuterConfiguration, testNestedSerializers);
+		return testSerializerSnapshot.resolveSchemaCompatibility(newTestSerializer);
+	}
+
+	// ------------------------------------------------------------------------------------------------
+	//  Scope: tests CompositeTypeSerializerSnapshot#restoreSerializer
+	// ------------------------------------------------------------------------------------------------
+
+	@Test
+	public void testRestoreCompositeTypeSerializer() throws IOException {
+		// the target compatibilities of the nested serializers doesn't matter,
+		// because we're only testing the restore serializer
+		TypeSerializer<?>[] testNestedSerializers = {
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+			new NestedSerializer(TargetCompatibility.INCOMPATIBLE),
+			new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION)
+		};
+
+		TestCompositeTypeSerializer testSerializer = new TestCompositeTypeSerializer("outer-config", testNestedSerializers);
+
+		TypeSerializerSnapshot<String> testSerializerSnapshot = testSerializer.snapshotConfiguration();
+
+		DataOutputSerializer out = new DataOutputSerializer(128);
+		TypeSerializerSnapshot.writeVersionedSnapshot(out, testSerializerSnapshot);
+
+		DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer());
+		testSerializerSnapshot = TypeSerializerSnapshot.readVersionedSnapshot(
+			in, Thread.currentThread().getContextClassLoader());
+
+		// now, restore the composite type serializer;
+		// the restored nested serializer should be a RestoredNestedSerializer
+		testSerializer = (TestCompositeTypeSerializer) testSerializerSnapshot.restoreSerializer();
+		Assert.assertTrue(testSerializer.getNestedSerializers()[0].getClass() == RestoredNestedSerializer.class);
+		Assert.assertTrue(testSerializer.getNestedSerializers()[1].getClass() == RestoredNestedSerializer.class);
+		Assert.assertTrue(testSerializer.getNestedSerializers()[2].getClass() == RestoredNestedSerializer.class);
+	}
+
+	// ------------------------------------------------------------------------------------------------
+	//  Test utilities
+	// ------------------------------------------------------------------------------------------------
+
+	/**
+	 * A simple composite serializer used for testing.
+	 * It can be configured with an array of nested serializers, as well as outer configuration (represented as String).
+	 */
+	public static class TestCompositeTypeSerializer extends TypeSerializer<String> {
+
+		private static final long serialVersionUID = -545688468997398105L;
+
+		private static final StringSerializer delegateSerializer = StringSerializer.INSTANCE;
+
+		private final String outerConfiguration;
+
+		private final TypeSerializer<?>[] nestedSerializers;
+
+		TestCompositeTypeSerializer(
+				String outerConfiguration,
+				TypeSerializer<?>[] nestedSerializers) {
+			this.outerConfiguration = outerConfiguration;
+			this.nestedSerializers = nestedSerializers;
+		}
+
+		public String getOuterConfiguration() {
+			return outerConfiguration;
+		}
+
+		TypeSerializer<?>[] getNestedSerializers() {
+			return nestedSerializers;
+		}
+
+		@Override
+		public TypeSerializerSnapshot<String> snapshotConfiguration() {
+			return new TestCompositeTypeSerializerSnapshot(this);
+		}
+
+		// --------------------------------------------------------------------------------
+		//  Serialization delegation
+		// --------------------------------------------------------------------------------
+
+		@Override
+		public String deserialize(String reuse, DataInputView source) throws IOException {
+			return delegateSerializer.deserialize(reuse, source);
+		}
+
+		@Override
+		public String deserialize(DataInputView source) throws IOException {
+			return delegateSerializer.deserialize(source);
+		}
+
+		@Override
+		public void serialize(String record, DataOutputView target) throws IOException {
+			delegateSerializer.serialize(record, target);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			delegateSerializer.copy(source, target);
+		}
+
+		@Override
+		public String copy(String from) {
+			return delegateSerializer.copy(from);
+		}
+
+		@Override
+		public String copy(String from, String reuse) {
+			return delegateSerializer.copy(from, reuse);
+		}
+
+		@Override
+		public String createInstance() {
+			return delegateSerializer.createInstance();
+		}
+
+		@Override
+		public TypeSerializer<String> duplicate() {
+			return this;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (canEqual(obj)) {
+				return Arrays.equals(nestedSerializers, ((TestCompositeTypeSerializer) obj).getNestedSerializers());
+			}
+			return false;
+		}
+
+		@Override
+		public int hashCode() {
+			return Arrays.hashCode(nestedSerializers);
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof TestCompositeTypeSerializer;
+		}
+	}
+
+	/**
+	 * Snapshot class for the {@link TestCompositeTypeSerializer}.
+	 */
+	public static class TestCompositeTypeSerializerSnapshot extends CompositeTypeSerializerSnapshot<String, TestCompositeTypeSerializer> {
+
+		private String outerConfiguration;
+
+		public TestCompositeTypeSerializerSnapshot() {
+			super(TestCompositeTypeSerializer.class);
+		}
+
+		TestCompositeTypeSerializerSnapshot(TestCompositeTypeSerializer serializer) {
+			super(serializer);
+			this.outerConfiguration = serializer.getOuterConfiguration();
+		}
+
+		@Override
+		protected TestCompositeTypeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+			return new TestCompositeTypeSerializer(outerConfiguration, nestedSerializers);
+		}
+
+		@Override
+		protected TypeSerializer<?>[] getNestedSerializers(TestCompositeTypeSerializer outerSerializer) {
+			return outerSerializer.getNestedSerializers();
+		}
+
+		@Override
+		protected void writeOuterSnapshot(DataOutputView out) throws IOException {
+			out.writeUTF(outerConfiguration);
+		}
+
+		@Override
+		public void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+			Assert.assertEquals(getCurrentOuterSnapshotVersion(), readOuterSnapshotVersion);
+			this.outerConfiguration = in.readUTF();
+		}
+
+		@Override
+		protected boolean isOuterSnapshotCompatible(TestCompositeTypeSerializer newSerializer) {
+			return outerConfiguration.equals(newSerializer.getOuterConfiguration());
+		}
+
+		@Override
+		public int getCurrentOuterSnapshotVersion() {
+			return 1;
+		}
+	}
+
+	public enum TargetCompatibility {
+		COMPATIBLE_AS_IS,
+		COMPATIBLE_AFTER_MIGRATION,
+		COMPATIBLE_WITH_RECONFIGURED_SERIALIZER,
+		INCOMPATIBLE
+	}
+
+	/**
+	 * Used as nested serializers in the test composite serializer.
+	 * A nested serializer can be configured with a {@link TargetCompatibility},
+	 * which indicates what the result of the schema compatibility check should be
+	 * when a new instance of it is being checked for compatibility.
+	 */
+	public static class NestedSerializer extends TypeSerializer<String> {
+
+		private static final long serialVersionUID = -6175000932620623446L;
+
+		private static final StringSerializer delegateSerializer = StringSerializer.INSTANCE;
+
+		private final TargetCompatibility targetCompatibility;
+
+		NestedSerializer(TargetCompatibility targetCompatibility) {
+			this.targetCompatibility = targetCompatibility;
+		}
+
+		@Override
+		public TypeSerializerSnapshot<String> snapshotConfiguration() {
+			return new NestedSerializerSnapshot(targetCompatibility);
+		}
+
+		// --------------------------------------------------------------------------------
+		//  Serialization delegation
+		// --------------------------------------------------------------------------------
+
+		@Override
+		public String deserialize(String reuse, DataInputView source) throws IOException {
+			return delegateSerializer.deserialize(reuse, source);
+		}
+
+		@Override
+		public String deserialize(DataInputView source) throws IOException {
+			return delegateSerializer.deserialize(source);
+		}
+
+		@Override
+		public void serialize(String record, DataOutputView target) throws IOException {
+			delegateSerializer.serialize(record, target);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			delegateSerializer.copy(source, target);
+		}
+
+		@Override
+		public String copy(String from) {
+			return delegateSerializer.copy(from);
+		}
+
+		@Override
+		public String copy(String from, String reuse) {
+			return delegateSerializer.copy(from, reuse);
+		}
+
+		@Override
+		public String createInstance() {
+			return delegateSerializer.createInstance();
+		}
+
+		@Override
+		public TypeSerializer<String> duplicate() {
+			return this;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (canEqual(obj)) {
+				return targetCompatibility == ((NestedSerializer) obj).targetCompatibility;
+			}
+			return false;
+		}
+
+		@Override
+		public int hashCode() {
+			return targetCompatibility.hashCode();
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof NestedSerializer;
+		}
+	}
+
+	/**
+	 * Snapshot of the {@link NestedSerializer}.
+	 */
+	public static class NestedSerializerSnapshot implements TypeSerializerSnapshot<String> {
+
+		private TargetCompatibility targetCompatibility;
+
+		public NestedSerializerSnapshot() {}
+
+		public NestedSerializerSnapshot(TargetCompatibility targetCompatibility) {
+			this.targetCompatibility = targetCompatibility;
+		}
+
+		@Override
+		public void writeSnapshot(DataOutputView out) throws IOException {
+			out.writeInt(targetCompatibility.ordinal());
+		}
+
+		@Override
+		public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+			this.targetCompatibility = TargetCompatibility.values()[in.readInt()];
+		}
+
+		@Override
+		public TypeSerializerSchemaCompatibility<String> resolveSchemaCompatibility(TypeSerializer<String> newSerializer) {
+			// checks the exact class instead of using instanceof;
+			// this ensures that we get a new serializer, and not a ReconfiguredNestedSerializer or RestoredNestedSerializer
+			if (newSerializer.getClass() == NestedSerializer.class) {
+				switch (targetCompatibility) {
+					case COMPATIBLE_AS_IS:
+						return TypeSerializerSchemaCompatibility.compatibleAsIs();
+					case COMPATIBLE_AFTER_MIGRATION:
+						return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
+					case COMPATIBLE_WITH_RECONFIGURED_SERIALIZER:
+						return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(
+							new ReconfiguredNestedSerializer(targetCompatibility));
+					case INCOMPATIBLE:
+						return TypeSerializerSchemaCompatibility.incompatible();
+					default:
+						throw new IllegalStateException("Unexpected target compatibility.");
+				}
+			}
+
+			throw new IllegalArgumentException("Expected the new serializer to be of class " + NestedSerializer.class);
+		}
+
+		@Override
+		public TypeSerializer<String> restoreSerializer() {
+			return new RestoredNestedSerializer(targetCompatibility);
+		}
+
+		@Override
+		public int getCurrentVersion() {
+			return 1;
+		}
+	}
+
+	/**
+	 * A variant of the {@link NestedSerializer} used only when creating a reconfigured instance
+	 * of the serializer. This is used in tests as a tag to identify that the correct serializer
+	 * instances are being used.
+	 */
+	static class ReconfiguredNestedSerializer extends NestedSerializer {
+
+		private static final long serialVersionUID = -1396401178636869659L;
+
+		public ReconfiguredNestedSerializer(TargetCompatibility targetCompatibility) {
+			super(targetCompatibility);
+		}
+
+	}
+
+	/**
+	 * A variant of the {@link NestedSerializer} used only when creating a restored instance
+	 * of the serializer. This is used in tests as a tag to identify that the correct serializer
+	 * instances are being used.
+	 */
+	static class RestoredNestedSerializer extends NestedSerializer {
+
+		private static final long serialVersionUID = -1396401178636869659L;
+
+		public RestoredNestedSerializer(TargetCompatibility targetCompatibility) {
+			super(targetCompatibility);
+		}
+
+	}
+}


[flink] 08/12: [FLINK-11073] [core] Let ScalaEitherSerializerSnapshot be a CompositeTypeSerializerSnapshot

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 68fab1277ca31ad65ddd594b0b6b7d4d9c0e4383
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:34:23 2018 +0800

    [FLINK-11073] [core] Let ScalaEitherSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
 .../typeutils/ScalaEitherSerializerSnapshot.java   | 62 ++++++----------------
 .../api/scala/typeutils/EitherSerializer.scala     |  2 +-
 2 files changed, 17 insertions(+), 47 deletions(-)

diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
index b67e47b..26cfef5 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaEitherSerializerSnapshot.java
@@ -18,81 +18,51 @@
 
 package org.apache.flink.api.scala.typeutils;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
 
 import scala.util.Either;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Configuration snapshot for serializers of Scala's {@link Either} type,
  * containing configuration snapshots of the Left and Right serializers.
  */
-public class ScalaEitherSerializerSnapshot<L, R> implements TypeSerializerSnapshot<Either<L, R>> {
+public class ScalaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedLeftRightSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public ScalaEitherSerializerSnapshot() {}
+	public ScalaEitherSerializerSnapshot() {
+		super(EitherSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public ScalaEitherSerializerSnapshot(TypeSerializer<L> leftSerializer, TypeSerializer<R> rightSerializer) {
-		Preconditions.checkNotNull(leftSerializer);
-		Preconditions.checkNotNull(rightSerializer);
-		this.nestedLeftRightSerializerSnapshot = new CompositeSerializerSnapshot(leftSerializer, rightSerializer);
+	public ScalaEitherSerializerSnapshot(EitherSerializer<L, R> eitherSerializer) {
+		super(eitherSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<Either<L, R>> restoreSerializer() {
-		return new EitherSerializer<>(
-			nestedLeftRightSerializerSnapshot.getRestoreSerializer(0),
-			nestedLeftRightSerializerSnapshot.getRestoreSerializer(1));
-	}
-
-	@Override
-	public TypeSerializerSchemaCompatibility<Either<L, R>> resolveSchemaCompatibility(
-			TypeSerializer<Either<L, R>> newSerializer) {
-		checkState(nestedLeftRightSerializerSnapshot != null);
+	protected EitherSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<L> leftSerializer = (TypeSerializer<L>) nestedSerializers[0];
 
-		if (newSerializer instanceof EitherSerializer) {
-			EitherSerializer<L, R> serializer = (EitherSerializer<L, R>) newSerializer;
+		@SuppressWarnings("unchecked")
+		TypeSerializer<R> rightSerializer = (TypeSerializer<R>) nestedSerializers[1];
 
-			return nestedLeftRightSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getLeftSerializer(),
-				serializer.getRightSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedLeftRightSerializerSnapshot.writeCompositeSnapshot(out);
+		return new EitherSerializer<>(leftSerializer, rightSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedLeftRightSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getLeftSerializer(), outerSerializer.getRightSerializer() };
 	}
 }
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 68432a6..0427bb3 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -125,7 +125,7 @@ class EitherSerializer[A, B](
   // --------------------------------------------------------------------------------------------
 
   override def snapshotConfiguration(): ScalaEitherSerializerSnapshot[A, B] = {
-    new ScalaEitherSerializerSnapshot[A, B](leftSerializer, rightSerializer)
+    new ScalaEitherSerializerSnapshot[A, B](this)
   }
 
   override def ensureCompatibility(


[flink] 10/12: [FLINK-11073] [core] Replace EitherSerializerSnapshot with new JavaEitherSerializerSnapshot

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 301be5552290788864676b131e09741f769ef471
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Mon Jan 7 12:50:27 2019 +0100

    [FLINK-11073] [core] Replace EitherSerializerSnapshot with new JavaEitherSerializerSnapshot
---
 .../java/typeutils/runtime/EitherSerializer.java   |  4 +-
 .../runtime/EitherSerializerSnapshot.java          | 12 +++--
 .../runtime/JavaEitherSerializerSnapshot.java      | 61 ++++++++++++++++++++++
 ...mpositeTypeSerializerSnapshotMigrationTest.java |  4 +-
 4 files changed, 72 insertions(+), 9 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
index 3d4e8e9..0128640 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -205,7 +205,7 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public EitherSerializerSnapshot<L, R> snapshotConfiguration() {
-		return new EitherSerializerSnapshot<>(leftSerializer, rightSerializer);
+	public JavaEitherSerializerSnapshot<L, R> snapshotConfiguration() {
+		return new JavaEitherSerializerSnapshot<>(this);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
index 016fd04..3b7a8e7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerSnapshot.java
@@ -35,8 +35,12 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Configuration snapshot for the {@link EitherSerializer}.
+ *
+ * @deprecated this snapshot class is no longer used by any serializers.
+ *             Instead, {@link JavaEitherSerializerSnapshot} is used.
  */
 @Internal
+@Deprecated
 public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnapshot<Either<L, R>> {
 
 	private static final int CURRENT_VERSION = 2;
@@ -110,12 +114,10 @@ public final class EitherSerializerSnapshot<L, R> implements TypeSerializerSnaps
 		checkState(nestedSnapshot != null);
 
 		if (newSerializer instanceof EitherSerializer) {
+			// delegate compatibility check to the new snapshot class
 			EitherSerializer<L, R> serializer = (EitherSerializer<L, R>) newSerializer;
-
-			return nestedSnapshot.resolveCompatibilityWithNested(
-					TypeSerializerSchemaCompatibility.compatibleAsIs(),
-					serializer.getLeftSerializer(),
-					serializer.getRightSerializer());
+			JavaEitherSerializerSnapshot<L, R> newSnapshot = new JavaEitherSerializerSnapshot<>(serializer);
+			return newSnapshot.resolveSchemaCompatibility(serializer);
 		}
 		else {
 			return TypeSerializerSchemaCompatibility.incompatible();
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
new file mode 100644
index 0000000..5036345
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaEitherSerializerSnapshot.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.Either;
+
+/**
+ * Snapshot class for the {@link EitherSerializer}.
+ */
+public class JavaEitherSerializerSnapshot<L, R> extends CompositeTypeSerializerSnapshot<Either<L, R>, EitherSerializer> {
+
+	private static final int CURRENT_VERSION = 1;
+
+	/**
+	 * Constructor for read instantiation.
+	 */
+	@SuppressWarnings("unused")
+	public JavaEitherSerializerSnapshot() {
+		super(EitherSerializer.class);
+	}
+
+	/**
+	 * Constructor to create the snapshot for writing.
+	 */
+	public JavaEitherSerializerSnapshot(EitherSerializer<L, R> eitherSerializer) {
+		super(eitherSerializer);
+	}
+
+	@Override
+	protected int getCurrentOuterSnapshotVersion() {
+		return CURRENT_VERSION;
+	}
+
+	@Override
+	protected EitherSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		return new EitherSerializer<>(nestedSerializers[0], nestedSerializers[1]);
+	}
+
+	@Override
+	protected TypeSerializer<?>[] getNestedSerializers(EitherSerializer outerSerializer) {
+		return new TypeSerializer<?>[]{ outerSerializer.getLeftSerializer(), outerSerializer.getRightSerializer() };
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
index c6b49a4..62135d7 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeutils.base.GenericArraySerializerSnapshot
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
-import org.apache.flink.api.java.typeutils.runtime.EitherSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.JavaEitherSerializerSnapshot;
 import org.apache.flink.types.Either;
 
 import org.junit.runner.RunWith;
@@ -48,7 +48,7 @@ public class CompositeTypeSerializerSnapshotMigrationTest extends TypeSerializer
 
 		// Either<String, Integer>
 
-		final TestSpecification<Either<String, Integer>> either = TestSpecification.<Either<String, Integer>>builder("1.6-either", EitherSerializer.class, EitherSerializerSnapshot.class)
+		final TestSpecification<Either<String, Integer>> either = TestSpecification.<Either<String, Integer>>builder("1.6-either", EitherSerializer.class, JavaEitherSerializerSnapshot.class)
 			.withSerializerProvider(() -> new EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE))
 			.withSnapshotDataLocation("flink-1.6-either-type-serializer-snapshot")
 			.withTestData("flink-1.6-either-type-serializer-data", 10);


[flink] 04/12: [FLINK-11073] [core] Let ArrayListSerializerSnapshot be a CompositeTypeSerializerSnapshot

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 383cc9e1875c2b19bb362b3af025547cc2b1dfbf
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:14:20 2018 +0800

    [FLINK-11073] [core] Let ArrayListSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
 .../flink/runtime/state/ArrayListSerializer.java   |  2 +-
 .../runtime/state/ArrayListSerializerSnapshot.java | 54 ++++++----------------
 2 files changed, 15 insertions(+), 41 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 6fa9f02..d442d0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -147,7 +147,7 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
 
 	@Override
 	public TypeSerializerSnapshot<ArrayList<T>> snapshotConfiguration() {
-		return new ArrayListSerializerSnapshot<>(elementSerializer);
+		return new ArrayListSerializerSnapshot<>(this);
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
index 7fc8c51..dde8d1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
@@ -18,72 +18,46 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 
-import java.io.IOException;
 import java.util.ArrayList;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Snapshot class for the {@link ArrayListSerializer}.
  */
-public class ArrayListSerializerSnapshot<T> implements TypeSerializerSnapshot<ArrayList<T>> {
+public class ArrayListSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ArrayList<T>, ArrayListSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public ArrayListSerializerSnapshot() {}
+	public ArrayListSerializerSnapshot() {
+		super(ArrayListSerializer.class);
+	}
 
 	/**
 	 * Constructor for creating the snapshot for writing.
 	 */
-	public ArrayListSerializerSnapshot(TypeSerializer<T> elementSerializer) {
-		this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(elementSerializer);
+	public ArrayListSerializerSnapshot(ArrayListSerializer<T> arrayListSerializer) {
+		super(arrayListSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<ArrayList<T>> restoreSerializer() {
-		return new ArrayListSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
-	}
-
-	@Override
-	public TypeSerializerSchemaCompatibility<ArrayList<T>> resolveSchemaCompatibility(TypeSerializer<ArrayList<T>> newSerializer) {
-		checkState(nestedElementSerializerSnapshot != null);
-
-		if (newSerializer instanceof ArrayListSerializer) {
-			ArrayListSerializer<T> serializer = (ArrayListSerializer<T>) newSerializer;
-
-			return nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getElementSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
+	protected ArrayListSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<T> elementSerializer = (TypeSerializer<T>) nestedSerializers[0];
+		return new ArrayListSerializer<>(elementSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedElementSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(ArrayListSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() };
 	}
 }


[flink] 07/12: [FLINK-11073] [core] Let MapSerializerSnapshot be a CompositeTypeSerializerSnapshot

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e6664dfea9668c178047ebfc782278a176afaaf
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:29:52 2018 +0800

    [FLINK-11073] [core] Let MapSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
 .../api/common/typeutils/base/MapSerializer.java   |  2 +-
 .../base/MapSerializerConfigSnapshot.java          |  3 +-
 .../typeutils/base/MapSerializerSnapshot.java      | 60 ++++++----------------
 3 files changed, 18 insertions(+), 47 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index dd3b81b..bedaf69 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -202,6 +202,6 @@ public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
 
 	@Override
 	public TypeSerializerSnapshot<Map<K, V>> snapshotConfiguration() {
-		return new MapSerializerSnapshot<>(keySerializer, valueSerializer);
+		return new MapSerializerSnapshot<>(this);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
index 000924f..2b78b52 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
@@ -50,8 +50,7 @@ public final class MapSerializerConfigSnapshot<K, V> extends CompositeTypeSerial
 			// redirect the compatibility check to the new MapSerializerConfigSnapshot
 			MapSerializer<K, V> mapSerializer = (MapSerializer<K, V>) newSerializer;
 
-			MapSerializerSnapshot<K, V> mapSerializerSnapshot =
-				new MapSerializerSnapshot<>(mapSerializer.getKeySerializer(), mapSerializer.getValueSerializer());
+			MapSerializerSnapshot<K, V> mapSerializerSnapshot = new MapSerializerSnapshot<>(mapSerializer);
 			return mapSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
 		}
 		else {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
index be2e4b0..a6db0ef 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
@@ -18,78 +18,50 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.Map;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Snapshot class for the {@link MapSerializer}.
  */
-public class MapSerializerSnapshot<K, V> implements TypeSerializerSnapshot<Map<K, V>> {
+public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<Map<K, V>, MapSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedKeyValueSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public MapSerializerSnapshot() {}
+	public MapSerializerSnapshot() {
+		super(MapSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public MapSerializerSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
-		Preconditions.checkNotNull(keySerializer);
-		Preconditions.checkNotNull(valueSerializer);
-		this.nestedKeyValueSerializerSnapshot = new CompositeSerializerSnapshot(keySerializer, valueSerializer);
+	public MapSerializerSnapshot(MapSerializer<K, V> mapSerializer) {
+		super(mapSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<Map<K, V>> restoreSerializer() {
-		return new MapSerializer<>(
-			nestedKeyValueSerializerSnapshot.getRestoreSerializer(0),
-			nestedKeyValueSerializerSnapshot.getRestoreSerializer(1));
-	}
+	protected MapSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<K> keySerializer = (TypeSerializer<K>) nestedSerializers[0];
 
-	@Override
-	public TypeSerializerSchemaCompatibility<Map<K, V>> resolveSchemaCompatibility(TypeSerializer<Map<K, V>> newSerializer) {
-		checkState(nestedKeyValueSerializerSnapshot != null);
+		@SuppressWarnings("unchecked")
+		TypeSerializer<V> valueSerializer = (TypeSerializer<V>) nestedSerializers[1];
 
-		if (newSerializer instanceof MapSerializer) {
-			MapSerializer<K, V> serializer = (MapSerializer<K, V>) newSerializer;
-
-			return nestedKeyValueSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getKeySerializer(),
-				serializer.getValueSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedKeyValueSerializerSnapshot.writeCompositeSnapshot(out);
+		return new MapSerializer<>(keySerializer, valueSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedKeyValueSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(MapSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getKeySerializer(), outerSerializer.getValueSerializer() };
 	}
 }


[flink] 05/12: [FLINK-11073] [core] Let LockableTypeSerializerSnapshot be a CompositeTypeSerializerSnapshot

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eda73763dfad7c7367f03556134637cf8edc3160
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:22:01 2018 +0800

    [FLINK-11073] [core] Let LockableTypeSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
 .../flink/cep/nfa/sharedbuffer/Lockable.java       |  2 +-
 .../LockableTypeSerializerSnapshot.java            | 56 ++++++----------------
 2 files changed, 15 insertions(+), 43 deletions(-)

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
index 7afbc50..ae1452b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
@@ -187,7 +187,7 @@ public final class Lockable<T> {
 
 		@Override
 		public TypeSerializerSnapshot<Lockable<E>> snapshotConfiguration() {
-			return new LockableTypeSerializerSnapshot<>(elementSerializer);
+			return new LockableTypeSerializerSnapshot<>(this);
 		}
 
 		/**
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
index 44a4670..13867ac 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/LockableTypeSerializerSnapshot.java
@@ -19,74 +19,46 @@
 package org.apache.flink.cep.nfa.sharedbuffer;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A {@link TypeSerializerSnapshot} for the {@link Lockable.LockableTypeSerializer}.
  */
 @Internal
-public class LockableTypeSerializerSnapshot<E> implements TypeSerializerSnapshot<Lockable<E>> {
+public class LockableTypeSerializerSnapshot<E> extends CompositeTypeSerializerSnapshot<Lockable<E>, Lockable.LockableTypeSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public LockableTypeSerializerSnapshot() {}
+	public LockableTypeSerializerSnapshot() {
+		super(Lockable.LockableTypeSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public LockableTypeSerializerSnapshot(TypeSerializer<E> elementSerializer) {
-		this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(elementSerializer));
+	public LockableTypeSerializerSnapshot(Lockable.LockableTypeSerializer<E> lockableTypeSerializer) {
+		super(lockableTypeSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<Lockable<E>> restoreSerializer() {
-		return new Lockable.LockableTypeSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
+	protected Lockable.LockableTypeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<E> elementSerializer = (TypeSerializer<E>) nestedSerializers[0];
+		return new Lockable.LockableTypeSerializer<>(elementSerializer);
 	}
 
 	@Override
-	public TypeSerializerSchemaCompatibility<Lockable<E>> resolveSchemaCompatibility(TypeSerializer<Lockable<E>> newSerializer) {
-		checkState(nestedElementSerializerSnapshot != null);
-
-		if (newSerializer instanceof Lockable.LockableTypeSerializer) {
-			Lockable.LockableTypeSerializer<E> serializer = (Lockable.LockableTypeSerializer<E>) newSerializer;
-
-			return nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getElementSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
+	protected TypeSerializer<?>[] getNestedSerializers(Lockable.LockableTypeSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() };
 	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
-	}
-
-	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedElementSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
-	}
-
 }


[flink] 09/12: [FLINK-11073] [core] Replace GenericArraySerializerConfigSnapshot with new GenericArraySerializerSnapshot

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e791b1a29b675af4f290be9b68ae7f03b2de43c5
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Dec 6 19:07:16 2018 +0800

    [FLINK-11073] [core] Replace GenericArraySerializerConfigSnapshot with new GenericArraySerializerSnapshot
---
 .../typeutils/base/GenericArraySerializer.java     |  4 +-
 .../base/GenericArraySerializerConfigSnapshot.java | 20 +++---
 .../base/GenericArraySerializerSnapshot.java       | 81 ++++++++++++++++++++++
 ...mpositeTypeSerializerSnapshotMigrationTest.java |  4 +-
 4 files changed, 94 insertions(+), 15 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 55ba8ab..a4949fb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -206,7 +206,7 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public GenericArraySerializerConfigSnapshot<C> snapshotConfiguration() {
-		return new GenericArraySerializerConfigSnapshot<>(this);
+	public GenericArraySerializerSnapshot<C> snapshotConfiguration() {
+		return new GenericArraySerializerSnapshot<>(this);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index b0aa241..8cbe76c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -38,8 +38,12 @@ import static org.apache.flink.util.Preconditions.checkState;
  * Point-in-time configuration of a {@link GenericArraySerializer}.
  *
  * @param <C> The component type.
+ *
+ * @deprecated this is deprecated and no longer used by the {@link GenericArraySerializer}.
+ *             It has been replaced by {@link GenericArraySerializerSnapshot}.
  */
 @Internal
+@Deprecated
 public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerializerSnapshot<C[]> {
 
 	private static final int CURRENT_VERSION = 2;
@@ -118,18 +122,12 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 
 	@Override
 	public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(TypeSerializer<C[]> newSerializer) {
-		checkState(componentClass != null && nestedSnapshot != null);
-
 		if (newSerializer instanceof GenericArraySerializer) {
-			GenericArraySerializer<C> serializer = (GenericArraySerializer<C>) newSerializer;
-			TypeSerializerSchemaCompatibility<C> compat = serializer.getComponentClass() == componentClass ?
-					TypeSerializerSchemaCompatibility.compatibleAsIs() :
-					TypeSerializerSchemaCompatibility.incompatible();
-
-			return nestedSnapshot.resolveCompatibilityWithNested(
-					compat, serializer.getComponentSerializer());
-		}
-		else {
+			// delegate to the new snapshot class
+			GenericArraySerializer<C> castedNewSerializer = (GenericArraySerializer<C>) newSerializer;
+			GenericArraySerializerSnapshot<C> newSnapshot = new GenericArraySerializerSnapshot<>(castedNewSerializer);
+			return newSnapshot.resolveSchemaCompatibility(castedNewSerializer);
+		} else {
 			return TypeSerializerSchemaCompatibility.incompatible();
 		}
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
new file mode 100644
index 0000000..3f54dee
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * Point-in-time configuration of a {@link GenericArraySerializer}.
+ *
+ * @param <C> The component type.
+ */
+public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> {
+
+	private static final int CURRENT_VERSION = 1;
+
+	private Class<C> componentClass;
+
+	/**
+	 * Constructor to be used for read instantiation.
+	 */
+	public GenericArraySerializerSnapshot() {
+		super(GenericArraySerializer.class);
+	}
+
+	/**
+	 * Constructor to be used for writing the snapshot.
+	 */
+	public GenericArraySerializerSnapshot(GenericArraySerializer<C> genericArraySerializer) {
+		super(genericArraySerializer);
+		this.componentClass = genericArraySerializer.getComponentClass();
+	}
+
+	@Override
+	protected int getCurrentOuterSnapshotVersion() {
+		return CURRENT_VERSION;
+	}
+
+	@Override
+	protected void writeOuterSnapshot(DataOutputView out) throws IOException {
+		out.writeUTF(componentClass.getName());
+	}
+
+	@Override
+	protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader);
+	}
+
+	@Override
+	protected GenericArraySerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<C> componentSerializer = (TypeSerializer<C>) nestedSerializers[0];
+		return new GenericArraySerializer<>(componentClass, componentSerializer);
+	}
+
+	@Override
+	protected TypeSerializer<?>[] getNestedSerializers(GenericArraySerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getComponentSerializer() };
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
index c7b002a..c6b49a4 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
-import org.apache.flink.api.common.typeutils.base.GenericArraySerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.GenericArraySerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
@@ -55,7 +55,7 @@ public class CompositeTypeSerializerSnapshotMigrationTest extends TypeSerializer
 
 		// GenericArray<String>
 
-		final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerConfigSnapshot.class)
+		final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerSnapshot.class)
 			.withSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
 			.withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot")
 			.withTestData("flink-1.6-array-type-serializer-data", 10);