You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:19 UTC

[flink] 06/12: [FLINK-11073] [core] Let ListViewSerializerSnapshot be a CompositeTypeSerializerSnapshot

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit faf093d33284b3a0e9e7b7ba7267b91503b69891
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:26:13 2018 +0800

    [FLINK-11073] [core] Let ListViewSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
 .../table/dataview/ListViewSerializerSnapshot.java | 54 ++++++----------------
 .../flink/table/dataview/ListViewSerializer.scala  |  2 +-
 2 files changed, 15 insertions(+), 41 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
index cca84d2..90468ac 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
@@ -18,76 +18,50 @@
 
 package org.apache.flink.table.dataview;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.table.api.dataview.ListView;
-import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.List;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * A {@link TypeSerializerSnapshot} for the {@link ListViewSerializer}.
  *
  * @param <T> the type of the list elements.
  */
-public final class ListViewSerializerSnapshot<T> implements TypeSerializerSnapshot<ListView<T>> {
+public final class ListViewSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ListView<T>, ListViewSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedListSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public ListViewSerializerSnapshot() {}
+	public ListViewSerializerSnapshot() {
+		super(ListViewSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public ListViewSerializerSnapshot(TypeSerializer<List<T>> listSerializer) {
-		this.nestedListSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(listSerializer));
+	public ListViewSerializerSnapshot(ListViewSerializer<T> listViewSerializer) {
+		super(listViewSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<ListView<T>> restoreSerializer() {
-		return new ListViewSerializer<>(nestedListSerializerSnapshot.getRestoreSerializer(0));
-	}
-
-	@Override
-	public TypeSerializerSchemaCompatibility<ListView<T>> resolveSchemaCompatibility(TypeSerializer<ListView<T>> newSerializer) {
-		checkState(nestedListSerializerSnapshot != null);
-
-		if (newSerializer instanceof ListViewSerializer) {
-			ListViewSerializer<T> serializer = (ListViewSerializer<T>) newSerializer;
-
-			return nestedListSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getListSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedListSerializerSnapshot.writeCompositeSnapshot(out);
+	protected ListViewSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<List<T>> listSerializer = (TypeSerializer<List<T>>) nestedSerializers[0];
+		return new ListViewSerializer<>(listSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedListSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(ListViewSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getListSerializer() };
 	}
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
index 246af6c..2d48c3d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
@@ -77,7 +77,7 @@ class ListViewSerializer[T](val listSerializer: TypeSerializer[java.util.List[T]
     listSerializer.equals(obj.asInstanceOf[ListViewSerializer[_]].listSerializer)
 
   override def snapshotConfiguration(): ListViewSerializerSnapshot[T] =
-    new ListViewSerializerSnapshot[T](listSerializer)
+    new ListViewSerializerSnapshot[T](this)
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[ListView[T]] = {