You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/04/26 16:28:03 UTC

flink git commit: [FLINK-8841] Remove HashMapSerializer and use MapSerializer instead.

Repository: flink
Updated Branches:
  refs/heads/master 0113ee2b3 -> 3ac282322


[FLINK-8841] Remove HashMapSerializer and use MapSerializer instead.

This closes #5910.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ac28232
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ac28232
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ac28232

Branch: refs/heads/master
Commit: 3ac282322a3940da9d69f81b9bc189b6d0c0463f
Parents: 0113ee2
Author: kkloudas <kk...@gmail.com>
Authored: Tue Apr 24 14:48:34 2018 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Apr 26 18:27:25 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/util/InstantiationUtil.java    |  59 ++++-
 .../KVStateRequestSerializerRocksDBTest.java    |   5 +-
 .../network/KvStateRequestSerializerTest.java   |   8 +-
 .../state/AbstractKeyedStateBackend.java        |   3 +-
 .../flink/runtime/state/HashMapSerializer.java  | 245 -------------------
 .../state/heap/HeapKeyedStateBackend.java       |  12 +-
 .../flink/runtime/state/heap/HeapMapState.java  |  34 +--
 .../state/internal/InternalMapState.java        |   2 +-
 ...pKeyedStateBackendSnapshotMigrationTest.java | 164 +++++++++++++
 .../heap_keyed_statebackend_1_5_map.snapshot    | Bin 0 -> 3613 bytes
 .../state/RocksDBKeyedStateBackend.java         |   2 +-
 .../streaming/state/RocksDBMapState.java        |   2 +-
 12 files changed, 249 insertions(+), 287 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 978d270..3db0236 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -21,6 +21,7 @@ package org.apache.flink.util;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
@@ -41,8 +42,10 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Modifier;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -194,10 +197,12 @@ public final class InstantiationUtil {
 			try {
 				Class.forName(streamClassDescriptor.getName(), false, classLoader);
 			} catch (ClassNotFoundException e) {
-				if (streamClassDescriptor.getName().equals("org.apache.avro.generic.GenericData$Array")) {
-					ObjectStreamClass result = ObjectStreamClass.lookup(
-						KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass.class);
-					return result;
+
+				final ObjectStreamClass equivalentSerializer =
+						MigrationUtil.getEquivalentSerializer(streamClassDescriptor.getName());
+
+				if (equivalentSerializer != null) {
+					return equivalentSerializer;
 				}
 			}
 
@@ -222,6 +227,52 @@ public final class InstantiationUtil {
 	}
 
 	/**
+	 * A mapping between the full path of a deprecated serializer and its equivalent.
+	 * These mappings are hardcoded and fixed.
+	 *
+	 * <p>IMPORTANT: mappings can be removed after 1 release as there will be a "migration path".
+	 * As an example, a serializer is removed in 1.5-SNAPSHOT, then the mapping should be added for 1.5,
+	 * and it can be removed in 1.6, as the path would be Flink-{< 1.5} -> Flink-1.5 -> Flink-{>= 1.6}.
+	 */
+	private enum MigrationUtil {
+
+		// To add a new mapping just pick a name and add an entry as the following:
+
+		GENERIC_DATA_ARRAY_SERIALIZER(
+				"org.apache.avro.generic.GenericData$Array",
+				ObjectStreamClass.lookup(KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass.class)),
+		HASH_MAP_SERIALIZER(
+				"org.apache.flink.runtime.state.HashMapSerializer",
+				ObjectStreamClass.lookup(MapSerializer.class)); // added in 1.5
+
+		/** An internal unmodifiable map containing the mappings between deprecated and new serializers. */
+		private static final Map<String, ObjectStreamClass> EQUIVALENCE_MAP = Collections.unmodifiableMap(initMap());
+
+		/** The full name of the class of the old serializer. */
+		private final String oldSerializerName;
+
+		/** The serialization descriptor of the class of the new serializer. */
+		private final ObjectStreamClass newSerializerStreamClass;
+
+		MigrationUtil(String oldSerializerName, ObjectStreamClass newSerializerStreamClass) {
+			this.oldSerializerName = oldSerializerName;
+			this.newSerializerStreamClass = newSerializerStreamClass;
+		}
+
+		private static Map<String, ObjectStreamClass> initMap() {
+			final Map<String, ObjectStreamClass> init = new HashMap<>(4);
+			for (MigrationUtil m: MigrationUtil.values()) {
+				init.put(m.oldSerializerName, m.newSerializerStreamClass);
+			}
+			return init;
+		}
+
+		private static ObjectStreamClass getEquivalentSerializer(String classDescriptorName) {
+			return EQUIVALENCE_MAP.get(classDescriptorName);
+		}
+	}
+
+	/**
 	 * Creates a new instance of the given class.
 	 *
 	 * @param <T> The generic type of the class.

http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index 4985bf3..6ee7631 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -41,7 +41,6 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
 import java.io.File;
-import java.util.Map;
 
 import static org.mockito.Mockito.mock;
 
@@ -160,8 +159,8 @@ public final class KVStateRequestSerializerRocksDBTest {
 		longHeapKeyedStateBackend.restore(null);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalMapState<Long, VoidNamespace, Long, String, Map<Long, String>> mapState =
-				(InternalMapState<Long, VoidNamespace, Long, String, Map<Long, String>>)
+		final InternalMapState<Long, VoidNamespace, Long, String> mapState =
+				(InternalMapState<Long, VoidNamespace, Long, String>)
 						longHeapKeyedStateBackend.getPartitionedState(
 								VoidNamespace.INSTANCE,
 								VoidNamespaceSerializer.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index dac1b90..1dc7186 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -307,8 +307,8 @@ public class KvStateRequestSerializerTest {
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
-		final InternalMapState<Long, VoidNamespace, Long, String, HashMap<Long, String>> mapState =
-				(InternalMapState<Long, VoidNamespace, Long, String, HashMap<Long, String>>)
+		final InternalMapState<Long, VoidNamespace, Long, String> mapState =
+				(InternalMapState<Long, VoidNamespace, Long, String>)
 						longHeapKeyedStateBackend.getPartitionedState(
 								VoidNamespace.INSTANCE,
 								VoidNamespaceSerializer.INSTANCE,
@@ -328,9 +328,9 @@ public class KvStateRequestSerializerTest {
 	 *
 	 * @throws Exception
 	 */
-	public static <M extends Map<Long, String>> void testMapSerialization(
+	public static void testMapSerialization(
 			final long key,
-			final InternalMapState<Long, VoidNamespace, Long, String, M> mapState) throws Exception {
+			final InternalMapState<Long, VoidNamespace, Long, String> mapState) throws Exception {
 
 		TypeSerializer<Long> userKeySerializer = LongSerializer.INSTANCE;
 		TypeSerializer<String> userValueSerializer = StringSerializer.INSTANCE;

http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 287474c..f873655 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -53,7 +53,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -232,7 +231,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
 	 * @param <UK> Type of the keys in the state
 	 * @param <UV> Type of the values in the state	 *
 	 */
-	protected abstract <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
+	protected abstract <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
 			TypeSerializer<N> namespaceSerializer,
 			MapStateDescriptor<UK, UV> stateDesc) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
deleted file mode 100644
index c1b6346..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A serializer for {@link HashMap}. The serializer relies on a key serializer and a value serializer
- * for the serialization of the map's key-value pairs.
- *
- * <p>The serialization format for the map is as follows: four bytes for the length of the map,
- * followed by the serialized representation of each key-value pair. To allow null values, each value
- * is prefixed by a null marker.
- *
- * @param <K> The type of the keys in the map.
- * @param <V> The type of the values in the map.
- */
-@Internal
-public final class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> {
-
-	private static final long serialVersionUID = -6885593032367050078L;
-	
-	/** The serializer for the keys in the map */
-	private final TypeSerializer<K> keySerializer;
-
-	/** The serializer for the values in the map */
-	private final TypeSerializer<V> valueSerializer;
-
-	/**
-	 * Creates a map serializer that uses the given serializers to serialize the key-value pairs in the map.
-	 *
-	 * @param keySerializer The serializer for the keys in the map
-	 * @param valueSerializer The serializer for the values in the map
-	 */
-	public HashMapSerializer(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer) {
-		this.keySerializer = Preconditions.checkNotNull(keySerializer, "The key serializer cannot be null");
-		this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "The value serializer cannot be null.");
-	}
-
-	// ------------------------------------------------------------------------
-	//  HashMapSerializer specific properties
-	// ------------------------------------------------------------------------
-
-	public TypeSerializer<K> getKeySerializer() {
-		return keySerializer;
-	}
-
-	public TypeSerializer<V> getValueSerializer() {
-		return valueSerializer;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Type Serializer implementation
-	// ------------------------------------------------------------------------
-
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public TypeSerializer<HashMap<K, V>> duplicate() {
-		TypeSerializer<K> duplicateKeySerializer = keySerializer.duplicate();
-		TypeSerializer<V> duplicateValueSerializer = valueSerializer.duplicate();
-
-		return (duplicateKeySerializer == keySerializer) && (duplicateValueSerializer == valueSerializer)
-				? this
-				: new HashMapSerializer<>(duplicateKeySerializer, duplicateValueSerializer);
-	}
-
-	@Override
-	public HashMap<K, V> createInstance() {
-		return new HashMap<>();
-	}
-
-	@Override
-	public HashMap<K, V> copy(HashMap<K, V> from) {
-		HashMap<K, V> newHashMap = new HashMap<>(from.size());
-
-		for (Map.Entry<K, V> entry : from.entrySet()) {
-			K newKey = keySerializer.copy(entry.getKey());
-			V newValue = entry.getValue() == null ? null : valueSerializer.copy(entry.getValue());
-
-			newHashMap.put(newKey, newValue);
-		}
-
-		return newHashMap;
-	}
-
-	@Override
-	public HashMap<K, V> copy(HashMap<K, V> from, HashMap<K, V> reuse) {
-		return copy(from);
-	}
-
-	@Override
-	public int getLength() {
-		return -1; // var length
-	}
-
-	@Override
-	public void serialize(HashMap<K, V> map, DataOutputView target) throws IOException {
-		final int size = map.size();
-		target.writeInt(size);
-
-		for (Map.Entry<K, V> entry : map.entrySet()) {
-			keySerializer.serialize(entry.getKey(), target);
-
-			if (entry.getValue() == null) {
-				target.writeBoolean(true);
-			} else {
-				target.writeBoolean(false);
-				valueSerializer.serialize(entry.getValue(), target);
-			}
-		}
-	}
-
-	@Override
-	public HashMap<K, V> deserialize(DataInputView source) throws IOException {
-		final int size = source.readInt();
-
-		final HashMap<K, V> map = new HashMap<>(size);
-		for (int i = 0; i < size; ++i) {
-			K key = keySerializer.deserialize(source);
-
-			boolean isNull = source.readBoolean();
-			V value = isNull ? null : valueSerializer.deserialize(source);
-
-			map.put(key, value);
-		}
-
-		return map;
-	}
-
-	@Override
-	public HashMap<K, V> deserialize(HashMap<K, V> reuse, DataInputView source) throws IOException {
-		return deserialize(source);
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		final int size = source.readInt();
-		target.writeInt(size);
-
-		for (int i = 0; i < size; ++i) {
-			keySerializer.copy(source, target);
-			
-			boolean isNull = source.readBoolean();
-			target.writeBoolean(isNull);
-			
-			if (!isNull) {
-				valueSerializer.copy(source, target);
-			}
-		}
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		return obj == this ||
-				(obj != null && obj.getClass() == getClass() &&
-						keySerializer.equals(((HashMapSerializer<?, ?>) obj).getKeySerializer()) &&
-						valueSerializer.equals(((HashMapSerializer<?, ?>) obj).getValueSerializer()));
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return (obj != null && obj.getClass() == getClass());
-	}
-
-	@Override
-	public int hashCode() {
-		return keySerializer.hashCode() * 31 + valueSerializer.hashCode();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Serializer configuration snapshotting & compatibility
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public TypeSerializerConfigSnapshot snapshotConfiguration() {
-		return new MapSerializerConfigSnapshot<>(keySerializer, valueSerializer);
-	}
-
-	@Override
-	public CompatibilityResult<HashMap<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-		if (configSnapshot instanceof MapSerializerConfigSnapshot) {
-			List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
-				((MapSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
-
-			CompatibilityResult<K> keyCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-					previousKvSerializersAndConfigs.get(0).f0,
-					UnloadableDummyTypeSerializer.class,
-					previousKvSerializersAndConfigs.get(0).f1,
-					keySerializer);
-
-			CompatibilityResult<V> valueCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-					previousKvSerializersAndConfigs.get(1).f0,
-					UnloadableDummyTypeSerializer.class,
-					previousKvSerializersAndConfigs.get(1).f1,
-					valueSerializer);
-
-			if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration()) {
-				return CompatibilityResult.compatible();
-			} else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) {
-				return CompatibilityResult.requiresMigration(
-					new HashMapSerializer<>(
-						new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
-						new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer())));
-			}
-		}
-
-		return CompatibilityResult.requiresMigration();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 82f883c..10803e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.DoneFuture;
-import org.apache.flink.runtime.state.HashMapSerializer;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -108,7 +107,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * but we can't put them here because different key/value states with different types and
 	 * namespace types share this central list of tables.
 	 */
-	private final HashMap<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
+	private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();
 
 	/**
 	 * Map of state names to their corresponding restored state meta info.
@@ -291,16 +290,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	protected <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
+	protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
 			TypeSerializer<N> namespaceSerializer,
 			MapStateDescriptor<UK, UV> stateDesc) throws Exception {
 
-		StateTable<K, N, HashMap<UK, UV>> stateTable = tryRegisterStateTable(
-				stateDesc.getName(),
-				stateDesc.getType(),
-				namespaceSerializer,
-				new HashMapSerializer<>(stateDesc.getKeySerializer(), stateDesc.getValueSerializer()));
-
+		StateTable<K, N, Map<UK, UV>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
 		return new HeapMapState<>(stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index 7c18071..ccd017f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
-import org.apache.flink.runtime.state.HashMapSerializer;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.Preconditions;
 
@@ -40,8 +40,8 @@ import java.util.Map;
  * @param <UV> The type of the values in the state.
  */
 public class HeapMapState<K, N, UK, UV>
-		extends AbstractHeapState<K, N, HashMap<UK, UV>, MapState<UK, UV>, MapStateDescriptor<UK, UV>>
-		implements InternalMapState<K, N, UK, UV, HashMap<UK, UV>> {
+		extends AbstractHeapState<K, N, Map<UK, UV>, MapState<UK, UV>, MapStateDescriptor<UK, UV>>
+		implements InternalMapState<K, N, UK, UV> {
 
 	/**
 	 * Creates a new key/value state for the given hash map of key/value pairs.
@@ -52,7 +52,7 @@ public class HeapMapState<K, N, UK, UV>
 	 */
 	public HeapMapState(
 			MapStateDescriptor<UK, UV> stateDesc,
-			StateTable<K, N, HashMap<UK, UV>> stateTable,
+			StateTable<K, N, Map<UK, UV>> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
 		super(stateDesc, stateTable, keySerializer, namespaceSerializer);
@@ -69,8 +69,8 @@ public class HeapMapState<K, N, UK, UV>
 	}
 
 	@Override
-	public TypeSerializer<HashMap<UK, UV>> getValueSerializer() {
-		return new HashMapSerializer<>(
+	public TypeSerializer<Map<UK, UV>> getValueSerializer() {
+		return new MapSerializer<>(
 				stateDesc.getKeySerializer(),
 				stateDesc.getValueSerializer()
 		);
@@ -79,7 +79,7 @@ public class HeapMapState<K, N, UK, UV>
 	@Override
 	public UV get(UK userKey) {
 
-		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = stateTable.get(currentNamespace);
 
 		if (userMap == null) {
 			return null;
@@ -91,7 +91,7 @@ public class HeapMapState<K, N, UK, UV>
 	@Override
 	public void put(UK userKey, UV userValue) {
 
-		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = stateTable.get(currentNamespace);
 		if (userMap == null) {
 			userMap = new HashMap<>();
 			stateTable.put(currentNamespace, userMap);
@@ -103,7 +103,7 @@ public class HeapMapState<K, N, UK, UV>
 	@Override
 	public void putAll(Map<UK, UV> value) {
 
-		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = stateTable.get(currentNamespace);
 
 		if (userMap == null) {
 			userMap = new HashMap<>();
@@ -116,7 +116,7 @@ public class HeapMapState<K, N, UK, UV>
 	@Override
 	public void remove(UK userKey) {
 
-		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = stateTable.get(currentNamespace);
 		if (userMap == null) {
 			return;
 		}
@@ -130,31 +130,31 @@ public class HeapMapState<K, N, UK, UV>
 
 	@Override
 	public boolean contains(UK userKey) {
-		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = stateTable.get(currentNamespace);
 		return userMap != null && userMap.containsKey(userKey);
 	}
 
 	@Override
 	public Iterable<Map.Entry<UK, UV>> entries() {
-		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = stateTable.get(currentNamespace);
 		return userMap == null ? null : userMap.entrySet();
 	}
 	
 	@Override
 	public Iterable<UK> keys() {
-		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = stateTable.get(currentNamespace);
 		return userMap == null ? null : userMap.keySet();
 	}
 
 	@Override
 	public Iterable<UV> values() {
-		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = stateTable.get(currentNamespace);
 		return userMap == null ? null : userMap.values();
 	}
 
 	@Override
 	public Iterator<Map.Entry<UK, UV>> iterator() {
-		HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
+		Map<UK, UV> userMap = stateTable.get(currentNamespace);
 		return userMap == null ? null : userMap.entrySet().iterator();
 	}
 
@@ -163,7 +163,7 @@ public class HeapMapState<K, N, UK, UV>
 			final byte[] serializedKeyAndNamespace,
 			final TypeSerializer<K> safeKeySerializer,
 			final TypeSerializer<N> safeNamespaceSerializer,
-			final TypeSerializer<HashMap<UK, UV>> safeValueSerializer) throws Exception {
+			final TypeSerializer<Map<UK, UV>> safeValueSerializer) throws Exception {
 
 		Preconditions.checkNotNull(serializedKeyAndNamespace);
 		Preconditions.checkNotNull(safeKeySerializer);
@@ -179,7 +179,7 @@ public class HeapMapState<K, N, UK, UV>
 			return null;
 		}
 
-		final HashMapSerializer<UK, UV> serializer = (HashMapSerializer<UK, UV>) safeValueSerializer;
+		final MapSerializer<UK, UV> serializer = (MapSerializer<UK, UV>) safeValueSerializer;
 
 		final TypeSerializer<UK> dupUserKeySerializer = serializer.getKeySerializer();
 		final TypeSerializer<UV> dupUserValueSerializer = serializer.getValueSerializer();

http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
index 91f698c..f9509e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/internal/InternalMapState.java
@@ -32,4 +32,4 @@ import java.util.Map;
  * @param <UK> Type of the values folded into the state
  * @param <UV> Type of the value in the state
  */
-public interface InternalMapState<K, N, UK, UV, ST extends Map<UK, UV>> extends InternalKvState<K, N, ST>, MapState<UK, UV> {}
+public interface InternalMapState<K, N, UK, UV> extends InternalKvState<K, N, Map<UK, UV>>, MapState<UK, UV> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
index 815ceae..345cd4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -20,10 +20,16 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
@@ -33,6 +39,9 @@ import java.io.BufferedInputStream;
 import java.io.FileInputStream;
 import java.net.URL;
 import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
@@ -42,6 +51,161 @@ import static org.junit.Assert.assertEquals;
  */
 public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackendTestBase {
 
+	@Test
+	public void testMapStateMigrationAfterHashMapSerRemoval() throws Exception {
+		ClassLoader cl = getClass().getClassLoader();
+		URL resource = cl.getResource("heap_keyed_statebackend_1_5_map.snapshot");
+
+		Preconditions.checkNotNull(resource, "Binary snapshot resource not found!");
+
+		try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend()) {
+			final Integer namespace1 = 1;
+			final Integer namespace2 = 2;
+			final Integer namespace3 = 3;
+
+			final SnapshotResult<KeyedStateHandle> stateHandles;
+			try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) {
+				stateHandles = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader());
+			}
+
+			final MapStateDescriptor<Long, Long> stateDescr = new MapStateDescriptor<>("my-map-state", Long.class, Long.class);
+			stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+			keyedBackend.restore(StateObjectCollection.singleton(stateHandles.getJobManagerOwnedSnapshot()));
+
+			InternalMapState<String, Integer, Long, Long> state = keyedBackend.createMapState(IntSerializer.INSTANCE, stateDescr);
+
+			keyedBackend.setCurrentKey("abc");
+			state.setCurrentNamespace(namespace1);
+			assertEquals(33L, (long) state.get(33L));
+			assertEquals(55L, (long) state.get(55L));
+			assertEquals(2, getStateSize(state));
+
+			state.setCurrentNamespace(namespace2);
+			assertEquals(22L, (long) state.get(22L));
+			assertEquals(11L, (long) state.get(11L));
+			assertEquals(2, getStateSize(state));
+
+			state.setCurrentNamespace(namespace3);
+			assertEquals(44L, (long) state.get(44L));
+			assertEquals(1, getStateSize(state));
+
+			keyedBackend.setCurrentKey("def");
+			state.setCurrentNamespace(namespace1);
+			assertEquals(11L, (long) state.get(11L));
+			assertEquals(44L, (long) state.get(44L));
+			assertEquals(2, getStateSize(state));
+
+			state.setCurrentNamespace(namespace3);
+			assertEquals(22L, (long) state.get(22L));
+			assertEquals(55L, (long) state.get(55L));
+			assertEquals(33L, (long) state.get(33L));
+			assertEquals(3, getStateSize(state));
+
+			keyedBackend.setCurrentKey("jkl");
+			state.setCurrentNamespace(namespace1);
+			assertEquals(11L, (long) state.get(11L));
+			assertEquals(22L, (long) state.get(22L));
+			assertEquals(33L, (long) state.get(33L));
+			assertEquals(44L, (long) state.get(44L));
+			assertEquals(55L, (long) state.get(55L));
+			assertEquals(5, getStateSize(state));
+
+			keyedBackend.setCurrentKey("mno");
+			state.setCurrentNamespace(namespace3);
+			assertEquals(11L, (long) state.get(11L));
+			assertEquals(22L, (long) state.get(22L));
+			assertEquals(33L, (long) state.get(33L));
+			assertEquals(44L, (long) state.get(44L));
+			assertEquals(55L, (long) state.get(55L));
+			assertEquals(5, getStateSize(state));
+
+			RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = keyedBackend.snapshot(
+					1L,
+					1L,
+					new MemCheckpointStreamFactory(4 * 1024 * 1024),
+					CheckpointOptions.forCheckpointWithDefaultLocation());
+
+			snapshot.run();
+		}
+	}
+
+	private <K, N, UK, UV> int getStateSize(InternalMapState<K, N, UK, UV> mapState) throws Exception {
+		int i = 0;
+		for (Iterator<Map.Entry<UK, UV>> itt = mapState.iterator(); itt.hasNext(); i++, itt.next()) {}
+		return i;
+	}
+
+//	/**
+//	 * This code was used to create the binary file of the old version's (< 1.5) snapshot used by this test.
+//   * If you need to recreate the binary, you can comment this out and run it.
+//	 */
+//	private void createBinarySnapshotWithMap() throws Exception {
+//
+//		final String pathToWrite = "/PATH/TO/WRITE";
+//
+//		final MapStateDescriptor<Long, Long> stateDescr = new MapStateDescriptor<>("my-map-state", Long.class, Long.class);
+//		stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+//
+//		final Integer namespace1 = 1;
+//		final Integer namespace2 = 2;
+//		final Integer namespace3 = 3;
+//
+//		try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend()) {
+//			InternalMapState<String, Integer, Long, Long> state = keyedBackend.createMapState(IntSerializer.INSTANCE, stateDescr);
+//
+//			keyedBackend.setCurrentKey("abc");
+//			state.setCurrentNamespace(namespace1);
+//			state.put(33L, 33L);
+//			state.put(55L, 55L);
+//
+//			state.setCurrentNamespace(namespace2);
+//			state.put(22L, 22L);
+//			state.put(11L, 11L);
+//
+//			state.setCurrentNamespace(namespace3);
+//			state.put(44L, 44L);
+//
+//			keyedBackend.setCurrentKey("def");
+//			state.setCurrentNamespace(namespace1);
+//			state.put(11L, 11L);
+//			state.put(44L, 44L);
+//
+//			state.setCurrentNamespace(namespace3);
+//			state.put(22L, 22L);
+//			state.put(55L, 55L);
+//			state.put(33L, 33L);
+//
+//			keyedBackend.setCurrentKey("jkl");
+//			state.setCurrentNamespace(namespace1);
+//			state.put(11L, 11L);
+//			state.put(22L, 22L);
+//			state.put(33L, 33L);
+//			state.put(44L, 44L);
+//			state.put(55L, 55L);
+//
+//			keyedBackend.setCurrentKey("mno");
+//			state.setCurrentNamespace(namespace3);
+//			state.put(11L, 11L);
+//			state.put(22L, 22L);
+//			state.put(33L, 33L);
+//			state.put(44L, 44L);
+//			state.put(55L, 55L);
+//
+//			RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = keyedBackend.snapshot(
+//					0L,
+//					0L,
+//					new MemCheckpointStreamFactory(4 * 1024 * 1024),
+//					CheckpointOptions.forCheckpointWithDefaultLocation());
+//
+//			snapshot.run();
+//
+//			try (BufferedOutputStream bis = new BufferedOutputStream(new FileOutputStream(pathToWrite))) {
+//				InstantiationUtil.serializeObject(bis, snapshot.get());
+//			}
+//		}
+//	}
+
 	/**
 	 * [FLINK-5979]
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-runtime/src/test/resources/heap_keyed_statebackend_1_5_map.snapshot
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/heap_keyed_statebackend_1_5_map.snapshot b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_5_map.snapshot
new file mode 100644
index 0000000..32c301f
Binary files /dev/null and b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_5_map.snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index ad40c70..69ce95c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1251,7 +1251,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@Override
-	protected <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
+	protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
 		TypeSerializer<N> namespaceSerializer,
 		MapStateDescriptor<UK, UV> stateDesc) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3ac28232/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index baa90fa..fbc55dc 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -60,7 +60,7 @@ import java.util.Map;
  */
 public class RocksDBMapState<K, N, UK, UV>
 		extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>, MapStateDescriptor<UK, UV>>
-		implements InternalMapState<K, N, UK, UV, Map<UK, UV>> {
+		implements InternalMapState<K, N, UK, UV> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);