You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/04/26 16:28:03 UTC
flink git commit: [FLINK-8841] Remove HashMapSerializer and use
MapSerializer instead.
Repository: flink
Updated Branches:
refs/heads/master 0113ee2b3 -> 3ac282322
[FLINK-8841] Remove HashMapSerializer and use MapSerializer instead.
This closes #5910.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ac28232
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ac28232
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ac28232
Branch: refs/heads/master
Commit: 3ac282322a3940da9d69f81b9bc189b6d0c0463f
Parents: 0113ee2
Author: kkloudas <kk...@gmail.com>
Authored: Tue Apr 24 14:48:34 2018 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Apr 26 18:27:25 2018 +0200
----------------------------------------------------------------------
.../apache/flink/util/InstantiationUtil.java | 59 ++++-
.../KVStateRequestSerializerRocksDBTest.java | 5 +-
.../network/KvStateRequestSerializerTest.java | 8 +-
.../state/AbstractKeyedStateBackend.java | 3 +-
.../flink/runtime/state/HashMapSerializer.java | 245 -------------------
.../state/heap/HeapKeyedStateBackend.java | 12 +-
.../flink/runtime/state/heap/HeapMapState.java | 34 +--
.../state/internal/InternalMapState.java | 2 +-
...pKeyedStateBackendSnapshotMigrationTest.java | 164 +++++++++++++
.../heap_keyed_statebackend_1_5_map.snapshot | Bin 0 -> 3613 bytes
.../state/RocksDBKeyedStateBackend.java | 2 +-
.../streaming/state/RocksDBMapState.java | 2 +-
12 files changed, 249 insertions(+), 287 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 978d270..3db0236 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -21,6 +21,7 @@ package org.apache.flink.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
@@ -41,8 +42,10 @@ import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
/**
@@ -194,10 +197,12 @@ public final class InstantiationUtil {
try {
Class.forName(streamClassDescriptor.getName(), false, classLoader);
} catch (ClassNotFoundException e) {
- if (streamClassDescriptor.getName().equals("org.apache.avro.generic.GenericData$Array")) {
- ObjectStreamClass result = ObjectStreamClass.lookup(
- KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass.class);
- return result;
+
+ final ObjectStreamClass equivalentSerializer =
+ MigrationUtil.getEquivalentSerializer(streamClassDescriptor.getName());
+
+ if (equivalentSerializer != null) {
+ return equivalentSerializer;
}
}
@@ -222,6 +227,52 @@ public final class InstantiationUtil {
}
/**
+ * A mapping between the full path of a deprecated serializer and its equivalent.
+ * These mappings are hardcoded and fixed.
+ *
+ * <p>IMPORTANT: mappings can be removed after 1 release as there will be a "migration path".
+ * As an example, a serializer is removed in 1.5-SNAPSHOT, then the mapping should be added for 1.5,
+ * and it can be removed in 1.6, as the path would be Flink-{< 1.5} -> Flink-1.5 -> Flink-{>= 1.6}.
+ */
+ private enum MigrationUtil {
+
+ // To add a new mapping just pick a name and add an entry as the following:
+
+ GENERIC_DATA_ARRAY_SERIALIZER(
+ "org.apache.avro.generic.GenericData$Array",
+ ObjectStreamClass.lookup(KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass.class)),
+ HASH_MAP_SERIALIZER(
+ "org.apache.flink.runtime.state.HashMapSerializer",
+ ObjectStreamClass.lookup(MapSerializer.class)); // added in 1.5
+
+ /** An internal unmodifiable map containing the mappings between deprecated and new serializers. */
+ private static final Map<String, ObjectStreamClass> EQUIVALENCE_MAP = Collections.unmodifiableMap(initMap());
+
+ /** The full name of the class of the old serializer. */
+ private final String oldSerializerName;
+
+ /** The serialization descriptor of the class of the new serializer. */
+ private final ObjectStreamClass newSerializerStreamClass;
+
+ MigrationUtil(String oldSerializerName, ObjectStreamClass newSerializerStreamClass) {
+ this.oldSerializerName = oldSerializerName;
+ this.newSerializerStreamClass = newSerializerStreamClass;
+ }
+
+ private static Map<String, ObjectStreamClass> initMap() {
+ final Map<String, ObjectStreamClass> init = new HashMap<>(4);
+ for (MigrationUtil m: MigrationUtil.values()) {
+ init.put(m.oldSerializerName, m.newSerializerStreamClass);
+ }
+ return init;
+ }
+
+ private static ObjectStreamClass getEquivalentSerializer(String classDescriptorName) {
+ return EQUIVALENCE_MAP.get(classDescriptorName);
+ }
+ }
+
+ /**
* Creates a new instance of the given class.
*
* @param <T> The generic type of the class.
http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index 4985bf3..6ee7631 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -41,7 +41,6 @@ import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import java.io.File;
-import java.util.Map;
import static org.mockito.Mockito.mock;
@@ -160,8 +159,8 @@ public final class KVStateRequestSerializerRocksDBTest {
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);
- final InternalMapState<Long, VoidNamespace, Long, String, Map<Long, String>> mapState =
- (InternalMapState<Long, VoidNamespace, Long, String, Map<Long, String>>)
+ final InternalMapState<Long, VoidNamespace, Long, String> mapState =
+ (InternalMapState<Long, VoidNamespace, Long, String>)
longHeapKeyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index dac1b90..1dc7186 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -307,8 +307,8 @@ public class KvStateRequestSerializerTest {
);
longHeapKeyedStateBackend.setCurrentKey(key);
- final InternalMapState<Long, VoidNamespace, Long, String, HashMap<Long, String>> mapState =
- (InternalMapState<Long, VoidNamespace, Long, String, HashMap<Long, String>>)
+ final InternalMapState<Long, VoidNamespace, Long, String> mapState =
+ (InternalMapState<Long, VoidNamespace, Long, String>)
longHeapKeyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
@@ -328,9 +328,9 @@ public class KvStateRequestSerializerTest {
*
* @throws Exception
*/
- public static <M extends Map<Long, String>> void testMapSerialization(
+ public static void testMapSerialization(
final long key,
- final InternalMapState<Long, VoidNamespace, Long, String, M> mapState) throws Exception {
+ final InternalMapState<Long, VoidNamespace, Long, String> mapState) throws Exception {
TypeSerializer<Long> userKeySerializer = LongSerializer.INSTANCE;
TypeSerializer<String> userValueSerializer = StringSerializer.INSTANCE;
http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 287474c..f873655 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -53,7 +53,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Map;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -232,7 +231,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
* @param <UK> Type of the keys in the state
* @param <UV> Type of the values in the state *
*/
- protected abstract <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
+ protected abstract <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
TypeSerializer<N> namespaceSerializer,
MapStateDescriptor<UK, UV> stateDesc) throws Exception;
http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
deleted file mode 100644
index c1b6346..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A serializer for {@link HashMap}. The serializer relies on a key serializer and a value serializer
- * for the serialization of the map's key-value pairs.
- *
- * <p>The serialization format for the map is as follows: four bytes for the length of the map,
- * followed by the serialized representation of each key-value pair. To allow null values, each value
- * is prefixed by a null marker.
- *
- * @param <K> The type of the keys in the map.
- * @param <V> The type of the values in the map.
- */
-@Internal
-public final class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> {
-
- private static final long serialVersionUID = -6885593032367050078L;
-
- /** The serializer for the keys in the map */
- private final TypeSerializer<K> keySerializer;
-
- /** The serializer for the values in the map */
- private final TypeSerializer<V> valueSerializer;
-
- /**
- * Creates a map serializer that uses the given serializers to serialize the key-value pairs in the map.
- *
- * @param keySerializer The serializer for the keys in the map
- * @param valueSerializer The serializer for the values in the map
- */
- public HashMapSerializer(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
- this.keySerializer = Preconditions.checkNotNull(keySerializer, "The key serializer cannot be null");
- this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "The value serializer cannot be null.");
- }
-
- // ------------------------------------------------------------------------
- // HashMapSerializer specific properties
- // ------------------------------------------------------------------------
-
- public TypeSerializer<K> getKeySerializer() {
- return keySerializer;
- }
-
- public TypeSerializer<V> getValueSerializer() {
- return valueSerializer;
- }
-
- // ------------------------------------------------------------------------
- // Type Serializer implementation
- // ------------------------------------------------------------------------
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public TypeSerializer<HashMap<K, V>> duplicate() {
- TypeSerializer<K> duplicateKeySerializer = keySerializer.duplicate();
- TypeSerializer<V> duplicateValueSerializer = valueSerializer.duplicate();
-
- return (duplicateKeySerializer == keySerializer) && (duplicateValueSerializer == valueSerializer)
- ? this
- : new HashMapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
- }
-
- @Override
- public HashMap<K, V> createInstance() {
- return new HashMap<>();
- }
-
- @Override
- public HashMap<K, V> copy(HashMap<K, V> from) {
- HashMap<K, V> newHashMap = new HashMap<>(from.size());
-
- for (Map.Entry<K, V> entry : from.entrySet()) {
- K newKey = keySerializer.copy(entry.getKey());
- V newValue = entry.getValue() == null ? null : valueSerializer.copy(entry.getValue());
-
- newHashMap.put(newKey, newValue);
- }
-
- return newHashMap;
- }
-
- @Override
- public HashMap<K, V> copy(HashMap<K, V> from, HashMap<K, V> reuse) {
- return copy(from);
- }
-
- @Override
- public int getLength() {
- return -1; // var length
- }
-
- @Override
- public void serialize(HashMap<K, V> map, DataOutputView target) throws IOException {
- final int size = map.size();
- target.writeInt(size);
-
- for (Map.Entry<K, V> entry : map.entrySet()) {
- keySerializer.serialize(entry.getKey(), target);
-
- if (entry.getValue() == null) {
- target.writeBoolean(true);
- } else {
- target.writeBoolean(false);
- valueSerializer.serialize(entry.getValue(), target);
- }
- }
- }
-
- @Override
- public HashMap<K, V> deserialize(DataInputView source) throws IOException {
- final int size = source.readInt();
-
- final HashMap<K, V> map = new HashMap<>(size);
- for (int i = 0; i < size; ++i) {
- K key = keySerializer.deserialize(source);
-
- boolean isNull = source.readBoolean();
- V value = isNull ? null : valueSerializer.deserialize(source);
-
- map.put(key, value);
- }
-
- return map;
- }
-
- @Override
- public HashMap<K, V> deserialize(HashMap<K, V> reuse, DataInputView source) throws IOException {
- return deserialize(source);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- final int size = source.readInt();
- target.writeInt(size);
-
- for (int i = 0; i < size; ++i) {
- keySerializer.copy(source, target);
-
- boolean isNull = source.readBoolean();
- target.writeBoolean(isNull);
-
- if (!isNull) {
- valueSerializer.copy(source, target);
- }
- }
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj == this ||
- (obj != null && obj.getClass() == getClass() &&
- keySerializer.equals(((HashMapSerializer<?, ?>) obj).getKeySerializer()) &&
- valueSerializer.equals(((HashMapSerializer<?, ?>) obj).getValueSerializer()));
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return (obj != null && obj.getClass() == getClass());
- }
-
- @Override
- public int hashCode() {
- return keySerializer.hashCode() * 31 + valueSerializer.hashCode();
- }
-
- // --------------------------------------------------------------------------------------------
- // Serializer configuration snapshotting & compatibility
- // --------------------------------------------------------------------------------------------
-
- @Override
- public TypeSerializerConfigSnapshot snapshotConfiguration() {
- return new MapSerializerConfigSnapshot<>(keySerializer, valueSerializer);
- }
-
- @Override
- public CompatibilityResult<HashMap<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- if (configSnapshot instanceof MapSerializerConfigSnapshot) {
- List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
- ((MapSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
-
- CompatibilityResult<K> keyCompatResult = CompatibilityUtil.resolveCompatibilityResult(
- previousKvSerializersAndConfigs.get(0).f0,
- UnloadableDummyTypeSerializer.class,
- previousKvSerializersAndConfigs.get(0).f1,
- keySerializer);
-
- CompatibilityResult<V> valueCompatResult = CompatibilityUtil.resolveCompatibilityResult(
- previousKvSerializersAndConfigs.get(1).f0,
- UnloadableDummyTypeSerializer.class,
- previousKvSerializersAndConfigs.get(1).f1,
- valueSerializer);
-
- if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration()) {
- return CompatibilityResult.compatible();
- } else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) {
- return CompatibilityResult.requiresMigration(
- new HashMapSerializer<>(
- new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
- new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer())));
- }
- }
-
- return CompatibilityResult.requiresMigration();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 82f883c..10803e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
-import org.apache.flink.runtime.state.HashMapSerializer;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -108,7 +107,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* but we can't put them here because different key/value states with different types and
* namespace types share this central list of tables.
*/
- private final HashMap<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
+ private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
/**
* Map of state names to their corresponding restored state meta info.
@@ -291,16 +290,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- protected <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
+ protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
TypeSerializer<N> namespaceSerializer,
MapStateDescriptor<UK, UV> stateDesc) throws Exception {
- StateTable<K, N, HashMap<UK, UV>> stateTable = tryRegisterStateTable(
- stateDesc.getName(),
- stateDesc.getType(),
- namespaceSerializer,
- new HashMapSerializer<>(stateDesc.getKeySerializer(), stateDesc.getValueSerializer()));
-
+ StateTable<K, N, Map<UK, UV>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
return new HeapMapState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index 7c18071..ccd017f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
-import org.apache.flink.runtime.state.HashMapSerializer;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.Preconditions;
@@ -40,8 +40,8 @@ import java.util.Map;
* @param <UV> The type of the values in the state.
*/
public class HeapMapState<K, N, UK, UV>
- extends AbstractHeapState<K, N, HashMap<UK, UV>, MapState<UK, UV>, MapStateDescriptor<UK, UV>>
- implements InternalMapState<K, N, UK, UV, HashMap<UK, UV>> {
+ extends AbstractHeapState<K, N, Map<UK, UV>, MapState<UK, UV>, MapStateDescriptor<UK, UV>>
+ implements InternalMapState<K, N, UK, UV> {
/**
* Creates a new key/value state for the given hash map of key/value pairs.
@@ -52,7 +52,7 @@ public class HeapMapState<K, N, UK, UV>
*/
public HeapMapState(
MapStateDescriptor<UK, UV> stateDesc,
- StateTable<K, N, HashMap<UK, UV>> stateTable,
+ StateTable<K, N, Map<UK, UV>> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
super(stateDesc, stateTable, keySerializer, namespaceSerializer);
@@ -69,8 +69,8 @@ public class HeapMapState<K, N, UK, UV>
}
@Override
- public TypeSerializer<HashMap<UK, UV>> getValueSerializer() {
- return new HashMapSerializer<>(
+ public TypeSerializer<Map<UK, UV>> getValueSerializer() {
+ return new MapSerializer<>(
stateDesc.getKeySerializer(),
stateDesc.getValueSerializer()
);
@@ -79,7 +79,7 @@ public class HeapMapState<K, N, UK, UV>
@Override
public UV get(UK userKey) {
- HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = stateTable.get(currentNamespace);
if (userMap == null) {
return null;
@@ -91,7 +91,7 @@ public class HeapMapState<K, N, UK, UV>
@Override
public void put(UK userKey, UV userValue) {
- HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = stateTable.get(currentNamespace);
if (userMap == null) {
userMap = new HashMap<>();
stateTable.put(currentNamespace, userMap);
@@ -103,7 +103,7 @@ public class HeapMapState<K, N, UK, UV>
@Override
public void putAll(Map<UK, UV> value) {
- HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = stateTable.get(currentNamespace);
if (userMap == null) {
userMap = new HashMap<>();
@@ -116,7 +116,7 @@ public class HeapMapState<K, N, UK, UV>
@Override
public void remove(UK userKey) {
- HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = stateTable.get(currentNamespace);
if (userMap == null) {
return;
}
@@ -130,31 +130,31 @@ public class HeapMapState<K, N, UK, UV>
@Override
public boolean contains(UK userKey) {
- HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = stateTable.get(currentNamespace);
return userMap != null && userMap.containsKey(userKey);
}
@Override
public Iterable<Map.Entry<UK, UV>> entries() {
- HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = stateTable.get(currentNamespace);
return userMap == null ? null : userMap.entrySet();
}
@Override
public Iterable<UK> keys() {
- HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = stateTable.get(currentNamespace);
return userMap == null ? null : userMap.keySet();
}
@Override
public Iterable<UV> values() {
- HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = stateTable.get(currentNamespace);
return userMap == null ? null : userMap.values();
}
@Override
public Iterator<Map.Entry<UK, UV>> iterator() {
- HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+ Map<UK, UV> userMap = stateTable.get(currentNamespace);
return userMap == null ? null : userMap.entrySet().iterator();
}
@@ -163,7 +163,7 @@ public class HeapMapState<K, N, UK, UV>
final byte[] serializedKeyAndNamespace,
final TypeSerializer<K> safeKeySerializer,
final TypeSerializer<N> safeNamespaceSerializer,
- final TypeSerializer<HashMap<UK, UV>> safeValueSerializer) throws Exception {
+ final TypeSerializer<Map<UK, UV>> safeValueSerializer) throws Exception {
Preconditions.checkNotNull(serializedKeyAndNamespace);
Preconditions.checkNotNull(safeKeySerializer);
@@ -179,7 +179,7 @@ public class HeapMapState<K, N, UK, UV>
return null;
}
- final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) safeValueSerializer;
+ final MapSerializer<UK, UV> serializer = (MapSerializer<UK, UV>) safeValueSerializer;
final TypeSerializer<UK> dupUserKeySerializer = serializer.getKeySerializer();
final TypeSerializer<UV> dupUserValueSerializer = serializer.getValueSerializer();
http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
index 91f698c..f9509e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
@@ -32,4 +32,4 @@ import java.util.Map;
* @param <UK> Type of the values folded into the state
* @param <UV> Type of the value in the state
*/
-public interface InternalMapState<K, N, UK, UV, ST extends Map<UK, UV>> extends InternalKvState<K, N, ST>, MapState<UK, UV> {}
+public interface InternalMapState<K, N, UK, UV> extends InternalKvState<K, N, Map<UK, UV>>, MapState<UK, UV> {}
http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
index 815ceae..345cd4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -20,10 +20,16 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
@@ -33,6 +39,9 @@ import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.net.URL;
import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
@@ -42,6 +51,161 @@ import static org.junit.Assert.assertEquals;
*/
public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackendTestBase {
+ @Test
+ public void testMapStateMigrationAfterHashMapSerRemoval() throws Exception {
+ ClassLoader cl = getClass().getClassLoader();
+ URL resource = cl.getResource("heap_keyed_statebackend_1_5_map.snapshot");
+
+ Preconditions.checkNotNull(resource, "Binary snapshot resource not found!");
+
+ try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend()) {
+ final Integer namespace1 = 1;
+ final Integer namespace2 = 2;
+ final Integer namespace3 = 3;
+
+ final SnapshotResult<KeyedStateHandle> stateHandles;
+ try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) {
+ stateHandles = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader());
+ }
+
+ final MapStateDescriptor<Long, Long> stateDescr = new MapStateDescriptor<>("my-map-state", Long.class, Long.class);
+ stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ keyedBackend.restore(StateObjectCollection.singleton(stateHandles.getJobManagerOwnedSnapshot()));
+
+ InternalMapState<String, Integer, Long, Long> state = keyedBackend.createMapState(IntSerializer.INSTANCE, stateDescr);
+
+ keyedBackend.setCurrentKey("abc");
+ state.setCurrentNamespace(namespace1);
+ assertEquals(33L, (long) state.get(33L));
+ assertEquals(55L, (long) state.get(55L));
+ assertEquals(2, getStateSize(state));
+
+ state.setCurrentNamespace(namespace2);
+ assertEquals(22L, (long) state.get(22L));
+ assertEquals(11L, (long) state.get(11L));
+ assertEquals(2, getStateSize(state));
+
+ state.setCurrentNamespace(namespace3);
+ assertEquals(44L, (long) state.get(44L));
+ assertEquals(1, getStateSize(state));
+
+ keyedBackend.setCurrentKey("def");
+ state.setCurrentNamespace(namespace1);
+ assertEquals(11L, (long) state.get(11L));
+ assertEquals(44L, (long) state.get(44L));
+ assertEquals(2, getStateSize(state));
+
+ state.setCurrentNamespace(namespace3);
+ assertEquals(22L, (long) state.get(22L));
+ assertEquals(55L, (long) state.get(55L));
+ assertEquals(33L, (long) state.get(33L));
+ assertEquals(3, getStateSize(state));
+
+ keyedBackend.setCurrentKey("jkl");
+ state.setCurrentNamespace(namespace1);
+ assertEquals(11L, (long) state.get(11L));
+ assertEquals(22L, (long) state.get(22L));
+ assertEquals(33L, (long) state.get(33L));
+ assertEquals(44L, (long) state.get(44L));
+ assertEquals(55L, (long) state.get(55L));
+ assertEquals(5, getStateSize(state));
+
+ keyedBackend.setCurrentKey("mno");
+ state.setCurrentNamespace(namespace3);
+ assertEquals(11L, (long) state.get(11L));
+ assertEquals(22L, (long) state.get(22L));
+ assertEquals(33L, (long) state.get(33L));
+ assertEquals(44L, (long) state.get(44L));
+ assertEquals(55L, (long) state.get(55L));
+ assertEquals(5, getStateSize(state));
+
+ RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = keyedBackend.snapshot(
+ 1L,
+ 1L,
+ new MemCheckpointStreamFactory(4 * 1024 * 1024),
+ CheckpointOptions.forCheckpointWithDefaultLocation());
+
+ snapshot.run();
+ }
+ }
+
+ private <K, N, UK, UV> int getStateSize(InternalMapState<K, N, UK, UV> mapState) throws Exception {
+ int i = 0;
+ for (Iterator<Map.Entry<UK, UV>> itt = mapState.iterator(); itt.hasNext(); i++, itt.next()) {}
+ return i;
+ }
+
+// /**
+// * This code was used to create the binary file of the old version's (< 1.5) snapshot used by this test.
+// * If you need to recreate the binary, you can comment this out and run it.
+// */
+// private void createBinarySnapshotWithMap() throws Exception {
+//
+// final String pathToWrite = "/PATH/TO/WRITE";
+//
+// final MapStateDescriptor<Long, Long> stateDescr = new MapStateDescriptor<>("my-map-state", Long.class, Long.class);
+// stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+//
+// final Integer namespace1 = 1;
+// final Integer namespace2 = 2;
+// final Integer namespace3 = 3;
+//
+// try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend()) {
+// InternalMapState<String, Integer, Long, Long> state = keyedBackend.createMapState(IntSerializer.INSTANCE, stateDescr);
+//
+// keyedBackend.setCurrentKey("abc");
+// state.setCurrentNamespace(namespace1);
+// state.put(33L, 33L);
+// state.put(55L, 55L);
+//
+// state.setCurrentNamespace(namespace2);
+// state.put(22L, 22L);
+// state.put(11L, 11L);
+//
+// state.setCurrentNamespace(namespace3);
+// state.put(44L, 44L);
+//
+// keyedBackend.setCurrentKey("def");
+// state.setCurrentNamespace(namespace1);
+// state.put(11L, 11L);
+// state.put(44L, 44L);
+//
+// state.setCurrentNamespace(namespace3);
+// state.put(22L, 22L);
+// state.put(55L, 55L);
+// state.put(33L, 33L);
+//
+// keyedBackend.setCurrentKey("jkl");
+// state.setCurrentNamespace(namespace1);
+// state.put(11L, 11L);
+// state.put(22L, 22L);
+// state.put(33L, 33L);
+// state.put(44L, 44L);
+// state.put(55L, 55L);
+//
+// keyedBackend.setCurrentKey("mno");
+// state.setCurrentNamespace(namespace3);
+// state.put(11L, 11L);
+// state.put(22L, 22L);
+// state.put(33L, 33L);
+// state.put(44L, 44L);
+// state.put(55L, 55L);
+//
+// RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = keyedBackend.snapshot(
+// 0L,
+// 0L,
+// new MemCheckpointStreamFactory(4 * 1024 * 1024),
+// CheckpointOptions.forCheckpointWithDefaultLocation());
+//
+// snapshot.run();
+//
+// try (BufferedOutputStream bis = new BufferedOutputStream(new FileOutputStream(pathToWrite))) {
+// InstantiationUtil.serializeObject(bis, snapshot.get());
+// }
+// }
+// }
+
/**
* [FLINK-5979]
*
http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-runtime/src/test/resources/heap_keyed_statebackend_1_5_map.snapshot
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/heap_keyed_statebackend_1_5_map.snapshot b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_5_map.snapshot
new file mode 100644
index 0000000..32c301f
Binary files /dev/null and b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_5_map.snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index ad40c70..69ce95c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1251,7 +1251,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- protected <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
+ protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
TypeSerializer<N> namespaceSerializer,
MapStateDescriptor<UK, UV> stateDesc) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index baa90fa..fbc55dc 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -60,7 +60,7 @@ import java.util.Map;
*/
public class RocksDBMapState<K, N, UK, UV>
extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>, MapStateDescriptor<UK, UV>>
- implements InternalMapState<K, N, UK, UV, Map<UK, UV>> {
+ implements InternalMapState<K, N, UK, UV> {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);