You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:20 UTC

[flink] 07/12: [FLINK-11073] [core] Let MapSerializerSnapshot be a CompositeTypeSerializerSnapshot

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e6664dfea9668c178047ebfc782278a176afaaf
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:29:52 2018 +0800

    [FLINK-11073] [core] Let MapSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
 .../api/common/typeutils/base/MapSerializer.java   |  2 +-
 .../base/MapSerializerConfigSnapshot.java          |  3 +-
 .../typeutils/base/MapSerializerSnapshot.java      | 60 ++++++----------------
 3 files changed, 18 insertions(+), 47 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index dd3b81b..bedaf69 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -202,6 +202,6 @@ public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> {
 
 	@Override
 	public TypeSerializerSnapshot<Map<K, V>> snapshotConfiguration() {
-		return new MapSerializerSnapshot<>(keySerializer, valueSerializer);
+		return new MapSerializerSnapshot<>(this);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
index 000924f..2b78b52 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerConfigSnapshot.java
@@ -50,8 +50,7 @@ public final class MapSerializerConfigSnapshot<K, V> extends CompositeTypeSerial
 			// redirect the compatibility check to the new MapSerializerConfigSnapshot
 			MapSerializer<K, V> mapSerializer = (MapSerializer<K, V>) newSerializer;
 
-			MapSerializerSnapshot<K, V> mapSerializerSnapshot =
-				new MapSerializerSnapshot<>(mapSerializer.getKeySerializer(), mapSerializer.getValueSerializer());
+			MapSerializerSnapshot<K, V> mapSerializerSnapshot = new MapSerializerSnapshot<>(mapSerializer);
 			return mapSerializerSnapshot.resolveSchemaCompatibility(newSerializer);
 		}
 		else {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
index be2e4b0..a6db0ef 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
@@ -18,78 +18,50 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
 
-import java.io.IOException;
 import java.util.Map;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Snapshot class for the {@link MapSerializer}.
  */
-public class MapSerializerSnapshot<K, V> implements TypeSerializerSnapshot<Map<K, V>> {
+public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<Map<K, V>, MapSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedKeyValueSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public MapSerializerSnapshot() {}
+	public MapSerializerSnapshot() {
+		super(MapSerializer.class);
+	}
 
 	/**
 	 * Constructor to create the snapshot for writing.
 	 */
-	public MapSerializerSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
-		Preconditions.checkNotNull(keySerializer);
-		Preconditions.checkNotNull(valueSerializer);
-		this.nestedKeyValueSerializerSnapshot = new CompositeSerializerSnapshot(keySerializer, valueSerializer);
+	public MapSerializerSnapshot(MapSerializer<K, V> mapSerializer) {
+		super(mapSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<Map<K, V>> restoreSerializer() {
-		return new MapSerializer<>(
-			nestedKeyValueSerializerSnapshot.getRestoreSerializer(0),
-			nestedKeyValueSerializerSnapshot.getRestoreSerializer(1));
-	}
+	protected MapSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<K> keySerializer = (TypeSerializer<K>) nestedSerializers[0];
 
-	@Override
-	public TypeSerializerSchemaCompatibility<Map<K, V>> resolveSchemaCompatibility(TypeSerializer<Map<K, V>> newSerializer) {
-		checkState(nestedKeyValueSerializerSnapshot != null);
+		@SuppressWarnings("unchecked")
+		TypeSerializer<V> valueSerializer = (TypeSerializer<V>) nestedSerializers[1];
 
-		if (newSerializer instanceof MapSerializer) {
-			MapSerializer<K, V> serializer = (MapSerializer<K, V>) newSerializer;
-
-			return nestedKeyValueSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getKeySerializer(),
-				serializer.getValueSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedKeyValueSerializerSnapshot.writeCompositeSnapshot(out);
+		return new MapSerializer<>(keySerializer, valueSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedKeyValueSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(MapSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getKeySerializer(), outerSerializer.getValueSerializer() };
 	}
 }