You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:16 UTC
[flink] 03/12: [FLINK-11073] [core] Let MapViewSerializerSnapshot
be a CompositeTypeSerializerSnapshot
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9822cc8a73cc4a54cb401d3ee31ce2e8894a6d6d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Wed Dec 5 16:11:11 2018 +0800
[FLINK-11073] [core] Let MapViewSerializerSnapshot be a CompositeTypeSerializerSnapshot
---
.../table/dataview/MapViewSerializerSnapshot.java | 55 ++++++----------------
.../flink/table/dataview/MapViewSerializer.scala | 2 +-
2 files changed, 15 insertions(+), 42 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
index f59fc0a..132f42f 100644
--- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/dataview/MapViewSerializerSnapshot.java
@@ -18,78 +18,51 @@
package org.apache.flink.table.dataview;
-import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.table.api.dataview.MapView;
-import org.apache.flink.util.Preconditions;
-import java.io.IOException;
import java.util.Map;
-import static org.apache.flink.util.Preconditions.checkState;
-
/**
* A {@link TypeSerializerSnapshot} for the {@link MapViewSerializer}.
*
* @param <K> the key type of the map entries.
* @param <V> the value type of the map entries.
*/
-public class MapViewSerializerSnapshot<K, V> implements TypeSerializerSnapshot<MapView<K, V>> {
+public class MapViewSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<MapView<K, V>, MapViewSerializer> {
private static final int CURRENT_VERSION = 1;
- private CompositeSerializerSnapshot nestedMapSerializerSnapshot;
-
/**
* Constructor for read instantiation.
*/
- public MapViewSerializerSnapshot() {}
+ public MapViewSerializerSnapshot() {
+ super(MapViewSerializer.class);
+ }
/**
* Constructor to create the snapshot for writing.
*/
- public MapViewSerializerSnapshot(TypeSerializer<Map<K, V>> mapSerializer) {
- this.nestedMapSerializerSnapshot = new CompositeSerializerSnapshot(Preconditions.checkNotNull(mapSerializer));
+ public MapViewSerializerSnapshot(MapViewSerializer<K, V> mapViewSerializer) {
+ super(mapViewSerializer);
}
@Override
- public int getCurrentVersion() {
+ public int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
}
@Override
- public TypeSerializer<MapView<K, V>> restoreSerializer() {
- return new MapViewSerializer<>(nestedMapSerializerSnapshot.getRestoreSerializer(0));
- }
-
- @Override
- public TypeSerializerSchemaCompatibility<MapView<K, V>> resolveSchemaCompatibility(
- TypeSerializer<MapView<K, V>> newSerializer) {
- checkState(nestedMapSerializerSnapshot != null);
-
- if (newSerializer instanceof MapViewSerializer) {
- MapViewSerializer<K, V> serializer = (MapViewSerializer<K, V>) newSerializer;
-
- return nestedMapSerializerSnapshot.resolveCompatibilityWithNested(
- TypeSerializerSchemaCompatibility.compatibleAsIs(),
- serializer.getMapSerializer());
- }
- else {
- return TypeSerializerSchemaCompatibility.incompatible();
- }
- }
-
- @Override
- public void writeSnapshot(DataOutputView out) throws IOException {
- nestedMapSerializerSnapshot.writeCompositeSnapshot(out);
+ protected MapViewSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+ @SuppressWarnings("unchecked")
+ TypeSerializer<Map<K, V>> mapSerializer = (TypeSerializer<Map<K, V>>) nestedSerializers[0];
+ return new MapViewSerializer<>(mapSerializer);
}
@Override
- public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
- this.nestedMapSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
+ protected TypeSerializer<?>[] getNestedSerializers(MapViewSerializer outerSerializer) {
+ return new TypeSerializer<?>[] { outerSerializer.getMapSerializer() };
}
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
index 89cdf70..e0067c5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
@@ -78,7 +78,7 @@ class MapViewSerializer[K, V](val mapSerializer: TypeSerializer[java.util.Map[K,
mapSerializer.equals(obj.asInstanceOf[MapViewSerializer[_, _]].mapSerializer)
override def snapshotConfiguration(): MapViewSerializerSnapshot[K, V] =
- new MapViewSerializerSnapshot[K, V](mapSerializer)
+ new MapViewSerializerSnapshot[K, V](this)
// copy and modified from MapSerializer.ensureCompatibility
override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot[_])