You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:17 UTC
[flink] 04/12: [FLINK-11073] [core] Let ArrayListSerializerSnapshot
be a CompositeTypeSerializerSnapshot
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 383cc9e1875c2b19bb362b3af025547cc2b1dfbf
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:14:20 2018 +0800
[FLINK-11073] [core] Let ArrayListSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
.../flink/runtime/state/ArrayListSerializer.java | 2 +-
.../runtime/state/ArrayListSerializerSnapshot.java | 54 ++++++----------------
2 files changed, 15 insertions(+), 41 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 6fa9f02..d442d0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -147,7 +147,7 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
@Override
public TypeSerializerSnapshot<ArrayList<T>> snapshotConfiguration() {
- return new ArrayListSerializerSnapshot<>(elementSerializer);
+ return new ArrayListSerializerSnapshot<>(this);
}
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
index 7fc8c51..dde8d1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializerSnapshot.java
@@ -18,72 +18,46 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import java.io.IOException;
import java.util.ArrayList;
-import static org.apache.flink.util.Preconditions.checkState;
-
/**
* Snapshot class for the {@link ArrayListSerializer}.
*/
-public class ArrayListSerializerSnapshot<T> implements TypeSerializerSnapshot<ArrayList<T>> {
+public class ArrayListSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ArrayList<T>, ArrayListSerializer> {
private static final int CURRENT_VERSION = 1;
- private CompositeSerializerSnapshot nestedElementSerializerSnapshot;
-
/**
* Constructor for read instantiation.
*/
- public ArrayListSerializerSnapshot() {}
+ public ArrayListSerializerSnapshot() {
+ super(ArrayListSerializer.class);
+ }
/**
* Constructor for creating the snapshot for writing.
*/
- public ArrayListSerializerSnapshot(TypeSerializer<T> elementSerializer) {
- this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(elementSerializer);
+ public ArrayListSerializerSnapshot(ArrayListSerializer<T> arrayListSerializer) {
+ super(arrayListSerializer);
}
@Override
- public int getCurrentVersion() {
+ public int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
}
@Override
- public TypeSerializer<ArrayList<T>> restoreSerializer() {
- return new ArrayListSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
- }
-
- @Override
- public TypeSerializerSchemaCompatibility<ArrayList<T>> resolveSchemaCompatibility(TypeSerializer<ArrayList<T>> newSerializer) {
- checkState(nestedElementSerializerSnapshot != null);
-
- if (newSerializer instanceof ArrayListSerializer) {
- ArrayListSerializer<T> serializer = (ArrayListSerializer<T>) newSerializer;
-
- return nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
- TypeSerializerSchemaCompatibility.compatibleAsIs(),
- serializer.getElementSerializer());
- }
- else {
- return TypeSerializerSchemaCompatibility.incompatible();
- }
- }
-
- @Override
- public void writeSnapshot(DataOutputView out) throws IOException {
- nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
+ protected ArrayListSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+ @SuppressWarnings("unchecked")
+ TypeSerializer<T> elementSerializer = (TypeSerializer<T>) nestedSerializers[0];
+ return new ArrayListSerializer<>(elementSerializer);
}
@Override
- public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
- this.nestedElementSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+ protected TypeSerializer<?>[] getNestedSerializers(ArrayListSerializer outerSerializer) {
+ return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() };
}
}