You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:16 UTC

[flink] 03/12: [FLINK-11073] [core] Let MapViewSerializerSnapshot be a CompositeTypeSerializerSnapshot

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9822cc8a73cc4a54cb401d3ee31ce2e8894a6d6d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:11:11 2018 +0800

    [FLINK-11073] [core] Let MapViewSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
 .../table/dataview/MapViewSerializerSnapshot.java  | 55 ++++++----------------
 .../flink/table/dataview/MapViewSerializer.scala   |  2 +-
 2 files changed, 15 insertions(+), 42 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
index f59fc0a..132f42f 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
@@ -18,78 +18,51 @@
 
 package org.apache.flink.table.dataview;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.table.api.dataview.MapView;
-import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.Map;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * A {@link TypeSerializerSnapshot} for the {@link MapViewSerializer}.
  *
  * @param <K> the key type of the map entries.
  * @param <V> the value type of the map entries.
  */
-public class MapViewSerializerSnapshot<K, V> implements TypeSerializerSnapshot<MapView<K, V>> {
+public class MapViewSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<MapView<K, V>, MapViewSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedMapSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public MapViewSerializerSnapshot() {}
+	public MapViewSerializerSnapshot() {
+		super(MapViewSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public MapViewSerializerSnapshot(TypeSerializer<Map<K, V>> mapSerializer) {
-		this.nestedMapSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(mapSerializer));
+	public MapViewSerializerSnapshot(MapViewSerializer<K, V> mapViewSerializer) {
+		super(mapViewSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<MapView<K, V>> restoreSerializer() {
-		return new MapViewSerializer<>(nestedMapSerializerSnapshot.getRestoreSerializer(0));
-	}
-
-	@Override
-	public TypeSerializerSchemaCompatibility<MapView<K, V>> resolveSchemaCompatibility(
-			TypeSerializer<MapView<K, V>> newSerializer) {
-		checkState(nestedMapSerializerSnapshot != null);
-
-		if (newSerializer instanceof MapViewSerializer) {
-			MapViewSerializer<K, V> serializer = (MapViewSerializer<K, V>) newSerializer;
-
-			return nestedMapSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getMapSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedMapSerializerSnapshot.writeCompositeSnapshot(out);
+	protected MapViewSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<Map<K, V>> mapSerializer = (TypeSerializer<Map<K, V>>) nestedSerializers[0];
+		return new MapViewSerializer<>(mapSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedMapSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(MapViewSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getMapSerializer() };
 	}
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
index 89cdf70..e0067c5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
@@ -78,7 +78,7 @@ class MapViewSerializer[K, V](val mapSerializer: TypeSerializer[java.util.Map[K,
     mapSerializer.equals(obj.asInstanceOf[MapViewSerializer[_, _]].mapSerializer)
 
   override def snapshotConfiguration(): MapViewSerializerSnapshot[K, V] =
-    new MapViewSerializerSnapshot[K, V](mapSerializer)
+    new MapViewSerializerSnapshot[K, V](this)
 
   // copy and modified from MapSerializer.ensureCompatibility
   override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot[_])