You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:17 UTC

[flink] 04/12: [FLINK-11073] [core] Let ArrayListSerializerSnapshot be a CompositeTypeSerializerSnapshot

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 383cc9e1875c2b19bb362b3af025547cc2b1dfbf
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:14:20 2018 +0800

    [FLINK-11073] [core] Let ArrayListSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
 .../flink/runtime/state/ArrayListSerializer.java   |  2 +-
 .../runtime/state/ArrayListSerializerSnapshot.java | 54 ++++++----------------
 2 files changed, 15 insertions(+), 41 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 6fa9f02..d442d0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -147,7 +147,7 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
 
 	@Override
 	public TypeSerializerSnapshot<ArrayList<T>> snapshotConfiguration() {
-		return new ArrayListSerializerSnapshot<>(elementSerializer);
+		return new ArrayListSerializerSnapshot<>(this);
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
index 7fc8c51..dde8d1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
@@ -18,72 +18,46 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 
-import java.io.IOException;
 import java.util.ArrayList;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * Snapshot class for the {@link ArrayListSerializer}.
  */
-public class ArrayListSerializerSnapshot<T> implements TypeSerializerSnapshot<ArrayList<T>> {
+public class ArrayListSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ArrayList<T>, ArrayListSerializer> {
 
 	private static final int CURRENT_VERSION = 1;
 
-	private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
-
 	/**
 	 * Constructor for read instantiation.
 	 */
-	public ArrayListSerializerSnapshot() {}
+	public ArrayListSerializerSnapshot() {
+		super(ArrayListSerializer.class);
+	}
 
 	/**
 	 * Constructor for creating the snapshot for writing.
 	 */
-	public ArrayListSerializerSnapshot(TypeSerializer<T> elementSerializer) {
-		this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(elementSerializer);
+	public ArrayListSerializerSnapshot(ArrayListSerializer<T> arrayListSerializer) {
+		super(arrayListSerializer);
 	}
 
 	@Override
-	public int getCurrentVersion() {
+	public int getCurrentOuterSnapshotVersion() {
 		return CURRENT_VERSION;
 	}
 
 	@Override
-	public TypeSerializer<ArrayList<T>> restoreSerializer() {
-		return new ArrayListSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
-	}
-
-	@Override
-	public TypeSerializerSchemaCompatibility<ArrayList<T>> resolveSchemaCompatibility(TypeSerializer<ArrayList<T>> newSerializer) {
-		checkState(nestedElementSerializerSnapshot != null);
-
-		if (newSerializer instanceof ArrayListSerializer) {
-			ArrayListSerializer<T> serializer = (ArrayListSerializer<T>) newSerializer;
-
-			return nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
-				TypeSerializerSchemaCompatibility.compatibleAsIs(),
-				serializer.getElementSerializer());
-		}
-		else {
-			return TypeSerializerSchemaCompatibility.incompatible();
-		}
-	}
-
-	@Override
-	public void writeSnapshot(DataOutputView out) throws IOException {
-		nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
+	protected ArrayListSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<T> elementSerializer = (TypeSerializer<T>) nestedSerializers[0];
+		return new ArrayListSerializer<>(elementSerializer);
 	}
 
 	@Override
-	public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
-		this.nestedElementSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+	protected TypeSerializer<?>[] getNestedSerializers(ArrayListSerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() };
 	}
 }