You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/12/11 09:47:53 UTC

[flink] 08/15: [hotfix] Cleanup unused methods / appropriate method renames in StateMetaInfoSnapshot

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 34ca8b772f9ebb152210692d12295db78c534e17
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Sun Dec 9 16:10:31 2018 +0800

    [hotfix] Cleanup unused methods / appropriate method renames in StateMetaInfoSnapshot
    
    This commit removes the `restoreTypeSerializer(...)` method.
    That method is no longer used after the series of changes in
    FLINK-11094. This also corresponds to the new principle that the restore
    serializer is only accessed, when the state backends attempt request it
    from their state meta infos. We do not create restore serializers
    eagerly when creating meta infos from a StateMetaInfoSnapshot.
    
    It also removes "config" from names of methods and fields
    related to serializer snapshotting.
    This corresponds to the abstraction rework of retiring
    TypeSerializerConfigSnapshot to be replaced by TypeSerializerSnapshot.
    The related fields / methods should not mention "config" anymore.
---
 .../RegisteredBroadcastStateBackendMetaInfo.java   |  4 +--
 .../RegisteredKeyValueStateBackendMetaInfo.java    |  4 +--
 .../RegisteredOperatorStateBackendMetaInfo.java    |  2 +-
 ...egisteredPriorityQueueStateBackendMetaInfo.java |  2 +-
 .../state/metainfo/StateMetaInfoSnapshot.java      | 33 ++++++++--------------
 .../StateMetaInfoSnapshotReadersWriters.java       |  2 +-
 .../runtime/state/SerializationProxiesTest.java    |  4 +--
 7 files changed, 20 insertions(+), 31 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
index 95a650e..ecc13fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
@@ -75,10 +75,10 @@ public class RegisteredBroadcastStateBackendMetaInfo<K, V> extends RegisteredSta
 				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
 			StateSerializerProvider.fromRestoredState(
 				(TypeSerializerSnapshot<K>) Preconditions.checkNotNull(
-					snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER))),
+					snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER))),
 			StateSerializerProvider.fromRestoredState(
 				(TypeSerializerSnapshot<V>) Preconditions.checkNotNull(
-					snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))));
+					snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))));
 
 		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType());
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
index ebe8e94..b37c79d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -87,10 +87,10 @@ public class RegisteredKeyValueStateBackendMetaInfo<N, S> extends RegisteredStat
 			snapshot.getName(),
 			StateSerializerProvider.fromRestoredState(
 				(TypeSerializerSnapshot<N>) Preconditions.checkNotNull(
-					snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER))),
+					snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER))),
 			StateSerializerProvider.fromRestoredState(
 				(TypeSerializerSnapshot<S>) Preconditions.checkNotNull(
-					snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))),
+					snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))),
 			null);
 
 		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.KEY_VALUE == snapshot.getBackendStateType());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
index afb3d77..921947a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
@@ -73,7 +73,7 @@ public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMe
 			snapshot.getName(),
 			StateSerializerProvider.fromRestoredState(
 				(TypeSerializerSnapshot<S>) Preconditions.checkNotNull(
-					snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))),
+					snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))),
 			OperatorStateHandle.Mode.valueOf(
 				snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
index 60c88e3..961d96f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredPriorityQueueStateBackendMetaInfo.java
@@ -51,7 +51,7 @@ public class RegisteredPriorityQueueStateBackendMetaInfo<T> extends RegisteredSt
 			snapshot.getName(),
 			StateSerializerProvider.fromRestoredState(
 				(TypeSerializerSnapshot<T>) Preconditions.checkNotNull(
-					snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))));
+					snapshot.getTypeSerializerSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))));
 
 		Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE == snapshot.getBackendStateType());
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
index 1e9d919..9b05500 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
@@ -81,7 +81,7 @@ public class StateMetaInfoSnapshot {
 
 	/** The configurations of all the type serializers used with the state. */
 	@Nonnull
-	private final Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshots;
+	private final Map<String, TypeSerializerSnapshot<?>> serializerSnapshots;
 
 	// TODO this will go away once all serializers have the restoreSerializer() factory method properly implemented.
 	/** The serializers used by the state. */
@@ -92,8 +92,8 @@ public class StateMetaInfoSnapshot {
 		@Nonnull String name,
 		@Nonnull BackendStateType backendStateType,
 		@Nonnull Map<String, String> options,
-		@Nonnull Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshots) {
-		this(name, backendStateType, options, serializerConfigSnapshots, new HashMap<>());
+		@Nonnull Map<String, TypeSerializerSnapshot<?>> serializerSnapshots) {
+		this(name, backendStateType, options, serializerSnapshots, new HashMap<>());
 	}
 
 	/**
@@ -106,12 +106,12 @@ public class StateMetaInfoSnapshot {
 		@Nonnull String name,
 		@Nonnull BackendStateType backendStateType,
 		@Nonnull Map<String, String> options,
-		@Nonnull Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshots,
+		@Nonnull Map<String, TypeSerializerSnapshot<?>> serializerSnapshots,
 		@Nonnull Map<String, TypeSerializer<?>> serializers) {
 		this.name = name;
 		this.backendStateType = backendStateType;
 		this.options = options;
-		this.serializerConfigSnapshots = serializerConfigSnapshots;
+		this.serializerSnapshots = serializerSnapshots;
 		this.serializers = serializers;
 	}
 
@@ -121,13 +121,13 @@ public class StateMetaInfoSnapshot {
 	}
 
 	@Nullable
-	public TypeSerializerSnapshot<?> getTypeSerializerConfigSnapshot(@Nonnull String key) {
-		return serializerConfigSnapshots.get(key);
+	public TypeSerializerSnapshot<?> getTypeSerializerSnapshot(@Nonnull String key) {
+		return serializerSnapshots.get(key);
 	}
 
 	@Nullable
-	public TypeSerializerSnapshot<?> getTypeSerializerConfigSnapshot(@Nonnull CommonSerializerKeys key) {
-		return getTypeSerializerConfigSnapshot(key.toString());
+	public TypeSerializerSnapshot<?> getTypeSerializerSnapshot(@Nonnull CommonSerializerKeys key) {
+		return getTypeSerializerSnapshot(key.toString());
 	}
 
 	@Nullable
@@ -150,20 +150,9 @@ public class StateMetaInfoSnapshot {
 		return name;
 	}
 
-	@Nullable
-	public TypeSerializer<?> restoreTypeSerializer(@Nonnull String key) {
-		TypeSerializerSnapshot<?> configSnapshot = getTypeSerializerConfigSnapshot(key);
-		return (configSnapshot != null) ? configSnapshot.restoreSerializer() : null;
-	}
-
-	@Nullable
-	public TypeSerializer<?> restoreTypeSerializer(@Nonnull CommonSerializerKeys key) {
-		return restoreTypeSerializer(key.toString());
-	}
-
 	@Nonnull
-	public Map<String, TypeSerializerSnapshot<?>> getSerializerConfigSnapshotsImmutable() {
-		return Collections.unmodifiableMap(serializerConfigSnapshots);
+	public Map<String, TypeSerializerSnapshot<?>> getSerializerSnapshotsImmutable() {
+		return Collections.unmodifiableMap(serializerSnapshots);
 	}
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
index 4408dfc..ad1e7be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
@@ -165,7 +165,7 @@ public class StateMetaInfoSnapshotReadersWriters {
 			@Nonnull DataOutputView outputView) throws IOException {
 			final Map<String, String> optionsMap = snapshot.getOptionsImmutable();
 			final Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap =
-				snapshot.getSerializerConfigSnapshotsImmutable();
+				snapshot.getSerializerSnapshotsImmutable();
 
 			outputView.writeUTF(snapshot.getName());
 			outputView.writeInt(snapshot.getBackendStateType().ordinal());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index c1f08e0..55aacb2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -251,7 +251,7 @@ public class SerializationProxiesTest {
 		Assert.assertEquals(expected.getBackendStateType(), actual.getBackendStateType());
 		Assert.assertEquals(expected.getOptionsImmutable(), actual.getOptionsImmutable());
 		Assert.assertEquals(
-			expected.getSerializerConfigSnapshotsImmutable(),
-			actual.getSerializerConfigSnapshotsImmutable());
+			expected.getSerializerSnapshotsImmutable(),
+			actual.getSerializerSnapshotsImmutable());
 	}
 }