You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:19 UTC
[flink] 06/12: [FLINK-11073] [core] Let ListViewSerializerSnapshot
be a CompositeTypeSerializerSnapshot
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit faf093d33284b3a0e9e7b7ba7267b91503b69891
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:26:13 2018 +0800
[FLINK-11073] [core] Let ListViewSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
.../table/dataview/ListViewSerializerSnapshot.java | 54 ++++++----------------
.../flink/table/dataview/ListViewSerializer.scala | 2 +-
2 files changed, 15 insertions(+), 41 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
index cca84d2..90468ac 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/ListViewSerializerSnapshot.java
@@ -18,76 +18,50 @@
package org.apache.flink.table.dataview;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.table.api.dataview.ListView;
-import org.apache.flink.util.Preconditions;
-import java.io.IOException;
import java.util.List;
-import static org.apache.flink.util.Preconditions.checkState;
-
/**
* A {@link TypeSerializerSnapshot} for the {@link ListViewSerializer}.
*
* @param <T> the type of the list elements.
*/
-public final class ListViewSerializerSnapshot<T> implements TypeSerializerSnapshot<ListView<T>> {
+public final class ListViewSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ListView<T>, ListViewSerializer> {
private static final int CURRENT_VERSION = 1;
- private CompositeSerializerSnapshot nestedListSerializerSnapshot;
-
/**
* Constructor for read instantiation.
*/
- public ListViewSerializerSnapshot() {}
+ public ListViewSerializerSnapshot() {
+ super(ListViewSerializer.class);
+ }
/**
* Constructor to create the snapshot for writing.
*/
- public ListViewSerializerSnapshot(TypeSerializer<List<T>> listSerializer) {
- this.nestedListSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(listSerializer));
+ public ListViewSerializerSnapshot(ListViewSerializer<T> listViewSerializer) {
+ super(listViewSerializer);
}
@Override
- public int getCurrentVersion() {
+ public int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
}
@Override
- public TypeSerializer<ListView<T>> restoreSerializer() {
- return new ListViewSerializer<>(nestedListSerializerSnapshot.getRestoreSerializer(0));
- }
-
- @Override
- public TypeSerializerSchemaCompatibility<ListView<T>> resolveSchemaCompatibility(TypeSerializer<ListView<T>> newSerializer) {
- checkState(nestedListSerializerSnapshot != null);
-
- if (newSerializer instanceof ListViewSerializer) {
- ListViewSerializer<T> serializer = (ListViewSerializer<T>) newSerializer;
-
- return nestedListSerializerSnapshot.resolveCompatibilityWithNested(
- TypeSerializerSchemaCompatibility.compatibleAsIs(),
- serializer.getListSerializer());
- }
- else {
- return TypeSerializerSchemaCompatibility.incompatible();
- }
- }
-
- @Override
- public void writeSnapshot(DataOutputView out) throws IOException {
- nestedListSerializerSnapshot.writeCompositeSnapshot(out);
+ protected ListViewSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+ @SuppressWarnings("unchecked")
+ TypeSerializer<List<T>> listSerializer = (TypeSerializer<List<T>>) nestedSerializers[0];
+ return new ListViewSerializer<>(listSerializer);
}
@Override
- public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
- this.nestedListSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+ protected TypeSerializer<?>[] getNestedSerializers(ListViewSerializer outerSerializer) {
+ return new TypeSerializer<?>[] { outerSerializer.getListSerializer() };
}
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
index 246af6c..2d48c3d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
@@ -77,7 +77,7 @@ class ListViewSerializer[T](val listSerializer: TypeSerializer[java.util.List[T]
listSerializer.equals(obj.asInstanceOf[ListViewSerializer[_]].listSerializer)
override def snapshotConfiguration(): ListViewSerializerSnapshot[T] =
- new ListViewSerializerSnapshot[T](listSerializer)
+ new ListViewSerializerSnapshot[T](this)
override def ensureCompatibility(
configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[ListView[T]] = {