You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/15 12:21:13 UTC

[2/2] flink git commit: [FLINK-5790] [core] Followups and tests for the StateDescriptor changes

[FLINK-5790] [core] Followups and tests for the StateDescriptor changes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2045cc5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2045cc5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2045cc5f

Branch: refs/heads/master
Commit: 2045cc5f84ab39f18f423154c5620a79ac6d44ba
Parents: d47446c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 14 15:32:03 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 15 13:10:30 2017 +0100

----------------------------------------------------------------------
 .../api/common/state/ListStateDescriptor.java   | 45 +++++++----
 .../common/typeutils/base/ListSerializer.java   | 50 ++++++++++--
 .../flink/api/java/typeutils/ListTypeInfo.java  | 45 ++++++++---
 .../common/typeutils/SerializerTestBase.java    |  7 +-
 .../typeutils/base/ListSerializerTest.java      | 83 ++++++++++++++++++++
 .../api/common/state/ListStateDescriptor.java   | 10 +--
 .../state/heap/HeapKeyedStateBackend.java       | 40 ++++++----
 .../flink/runtime/state/heap/HeapListState.java | 29 ++++---
 .../runtime/state/heap/HeapListStateTest.java   |  7 +-
 9 files changed, 237 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index 2047e24..c03f8cb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -27,8 +27,14 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo;
 import java.util.List;
 
 /**
- * A {@link StateDescriptor} for {@link ListState}. This can be used to create a partitioned
- * list state using
+ * A {@link StateDescriptor} for {@link ListState}. This can be used to create state where the type
+ * is a list that can be appended and iterated over.
+ * 
+ * <p>Using {@code ListState} is typically more efficient than manually maintaining a list in a
+ * {@link ValueState}, because the backing implementation can support efficient appends, rathern then
+ * replacing the full list on write.
+ * 
+ * <p>To create keyed list state (on a KeyedStream), use 
  * {@link org.apache.flink.api.common.functions.RuntimeContext#getListState(ListStateDescriptor)}.
  *
  * @param <T> The type of the values that can be added to the list state.
@@ -46,7 +52,6 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 	 * @param name The (unique) name for the state.
 	 * @param elementTypeClass The type of the elements in the state.
 	 */
-	@SuppressWarnings("unchecked")
 	public ListStateDescriptor(String name, Class<T> elementTypeClass) {
 		super(name, new ListTypeInfo<>(elementTypeClass), null);
 	}
@@ -57,7 +62,6 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 	 * @param name The (unique) name for the state.
 	 * @param elementTypeInfo The type of the elements in the state.
 	 */
-	@SuppressWarnings("unchecked")
 	public ListStateDescriptor(String name, TypeInformation<T> elementTypeInfo) {
 		super(name, new ListTypeInfo<>(elementTypeInfo), null);
 	}
@@ -68,26 +72,39 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 	 * @param name The (unique) name for the state.
 	 * @param typeSerializer The type serializer for the list values.
 	 */
-	@SuppressWarnings("unchecked")
 	public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
 		super(name, new ListSerializer<>(typeSerializer), null);
 	}
 
+	// ------------------------------------------------------------------------
+
+	@Override
+	public ListState<T> bind(StateBackend stateBackend) throws Exception {
+		return stateBackend.createListState(this);
+	}
+
+	/**
+	 * Gets the serializer for the elements contained in the list.
+	 * 
+	 * @return The serializer for the elements in the list.
+	 */
 	public TypeSerializer<T> getElementSerializer() {
-		if (!(serializer instanceof ListSerializer)) {
+		// call getSerializer() here to get the initialization check and proper error message
+		final TypeSerializer<List<T>> rawSerializer = getSerializer();
+		if (!(rawSerializer instanceof ListSerializer)) {
 			throw new IllegalStateException();
 		}
 
 		return ((ListSerializer<T>)serializer).getElementSerializer();
 	}
 
-	// ------------------------------------------------------------------------
-
 	@Override
-	public ListState<T> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createListState(this);
+	public Type getType() {
+		return Type.LIST;
 	}
 
+	// ------------------------------------------------------------------------
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -97,8 +114,7 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 			return false;
 		}
 
-		ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
-
+		final ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
 		return serializer.equals(that.serializer) && name.equals(that.name);
 
 	}
@@ -116,9 +132,4 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 				"serializer=" + serializer +
 				'}';
 	}
-
-	@Override
-	public Type getType() {
-		return Type.LIST;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index a875a3b..ca3c143 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -26,21 +26,49 @@ import java.io.IOException;
 import java.util.List;
 import java.util.ArrayList;
 
-@SuppressWarnings("ForLoopReplaceableByForEach")
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializer for {@link List Lists}. The serializer relies on an element serializer
+ * for teh serialization of the list's elements.
+ * 
+ * <p>The serialization format for the list is as follows: four bytes for the length of the lost,
+ * followed by the serialized representation of each element.
+ * 
+ * @param <T> The type of element in the list.
+ */
 public class ListSerializer<T> extends TypeSerializer<List<T>> {
 
 	private static final long serialVersionUID = 1119562170939152304L;
 
+	/** The serializer for the elements of the list */
 	private final TypeSerializer<T> elementSerializer;
 
+	/**
+	 * Creates a list serializer that uses the given serializer to serialize the list's elements.
+	 * 
+	 * @param elementSerializer The serializer for the elements of the list
+	 */
 	public ListSerializer(TypeSerializer<T> elementSerializer) {
-		this.elementSerializer = elementSerializer;
+		this.elementSerializer = checkNotNull(elementSerializer);
 	}
 
+	// ------------------------------------------------------------------------
+	//  ListSerializer specific properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the serializer for the elements of the list.
+	 * @return The serializer for the elements of the list
+	 */
 	public TypeSerializer<T> getElementSerializer() {
 		return elementSerializer;
 	}
 
+	// ------------------------------------------------------------------------
+	//  Type Serializer implementation
+	// ------------------------------------------------------------------------
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
@@ -54,14 +82,18 @@ public class ListSerializer<T> extends TypeSerializer<List<T>> {
 
 	@Override
 	public List<T> createInstance() {
-		return new ArrayList<>();
+		return new ArrayList<>(0);
 	}
 
 	@Override
 	public List<T> copy(List<T> from) {
 		List<T> newList = new ArrayList<>(from.size());
-		for (int i = 0; i < from.size(); i++) {
-			newList.add(elementSerializer.copy(from.get(i)));
+
+		// We iterate here rather than accessing by index, because we cannot be sure that
+		// the given list supports RandomAccess.
+		// The Iterator should be stack allocated on new JVMs (due to escape analysis)
+		for (T element : from) {
+			newList.add(elementSerializer.copy(element));
 		}
 		return newList;
 	}
@@ -80,8 +112,12 @@ public class ListSerializer<T> extends TypeSerializer<List<T>> {
 	public void serialize(List<T> list, DataOutputView target) throws IOException {
 		final int size = list.size();
 		target.writeInt(size);
-		for (int i = 0; i < size; i++) {
-			elementSerializer.serialize(list.get(i), target);
+
+		// We iterate here rather than accessing by index, because we cannot be sure that
+		// the given list supports RandomAccess.
+		// The Iterator should be stack allocated on new JVMs (due to escape analysis)
+		for (T element : list) {
+			elementSerializer.serialize(element, target);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
index e70aaf8..763be98 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -26,30 +26,44 @@ import org.apache.flink.api.common.typeutils.base.ListSerializer;
 
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link TypeInformation} for the list types of the Java API.
  *
  * @param <T> The type of the elements in the list.
  */
-
-
-@Public
+@PublicEvolving
 public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
 
+	private static final long serialVersionUID = 1L;
+
 	private final TypeInformation<T> elementTypeInfo;
 
+
 	public ListTypeInfo(Class<T> elementTypeClass) {
-		this.elementTypeInfo = TypeExtractor.createTypeInfo(elementTypeClass);
+		this.elementTypeInfo = of(checkNotNull(elementTypeClass, "elementTypeClass"));
 	}
 
 	public ListTypeInfo(TypeInformation<T> elementTypeInfo) {
-		this.elementTypeInfo = elementTypeInfo;
+		this.elementTypeInfo = checkNotNull(elementTypeInfo, "elementTypeInfo");
 	}
 
+	// ------------------------------------------------------------------------
+	//  ListTypeInfo specific properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the type information for the elements contained in the list
+	 */
 	public TypeInformation<T> getElementTypeInfo() {
 		return elementTypeInfo;
 	}
 
+	// ------------------------------------------------------------------------
+	//  TypeInformation implementation
+	// ------------------------------------------------------------------------
+
 	@Override
 	public boolean isBasicType() {
 		return false;
@@ -67,7 +81,9 @@ public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
 
 	@Override
 	public int getTotalFields() {
-		return elementTypeInfo.getTotalFields();
+		// similar as arrays, the lists are "opaque" to the direct field addressing logic
+		// since the list's elements are not addressable, we do not expose them
+		return 1;
 	}
 
 	@SuppressWarnings("unchecked")
@@ -87,17 +103,20 @@ public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
 		return new ListSerializer<>(elementTypeSerializer);
 	}
 
+	// ------------------------------------------------------------------------
+
 	@Override
 	public String toString() {
-		return null;
+		return "List<" + elementTypeInfo + '>';
 	}
 
 	@Override
 	public boolean equals(Object obj) {
-		if (obj instanceof ListTypeInfo) {
-			@SuppressWarnings("unchecked")
-			ListTypeInfo<T> other = (ListTypeInfo<T>) obj;
-
+		if (obj == this) {
+			return true;
+		}
+		else if (obj instanceof ListTypeInfo) {
+			final ListTypeInfo<?> other = (ListTypeInfo<?>) obj;
 			return other.canEqual(this) && elementTypeInfo.equals(other.elementTypeInfo);
 		} else {
 			return false;
@@ -111,6 +130,6 @@ public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
 
 	@Override
 	public boolean canEqual(Object obj) {
-		return (obj instanceof ListTypeInfo);
+		return obj != null && obj.getClass() == getClass();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index ea91e56..91c6145 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -73,8 +73,11 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			
 			Class<T> type = getTypeClass();
 			assertNotNull("The test is corrupt: type class is null.", type);
-			
-			assertEquals("Type of the instantiated object is wrong.", type, instance.getClass());
+
+			if (!type.isAssignableFrom(instance.getClass())) {
+				fail("Type of the instantiated object is wrong. " +
+						"Expected Type: " + type + " present type " + instance.getClass());
+			}
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
new file mode 100644
index 0000000..28cdc13
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A test for the {@link LongSerializer}.
+ */
+public class ListSerializerTest extends SerializerTestBase<List<Long>> {
+
+	@Override
+	protected TypeSerializer<List<Long>> createSerializer() {
+		return new ListSerializer<>(LongSerializer.INSTANCE);
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected Class<List<Long>> getTypeClass() {
+		return (Class<List<Long>>) (Class<?>) List.class;
+	}
+
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	@Override
+	protected List<Long>[] getTestData() {
+		final Random rnd = new Random(123654789);
+
+		// empty lists
+		final List<Long> list1 = Collections.emptyList(); 
+		final List<Long> list2 = new LinkedList<>();
+		final List<Long> list3 = new ArrayList<>();
+
+		// single element lists
+		final List<Long> list4 = Collections.singletonList(55L);
+		final List<Long> list5 = new LinkedList<>();
+		list5.add(12345L);
+		final List<Long> list6 = new ArrayList<>();
+		list6.add(777888L);
+
+		// longer lists
+		final List<Long> list7 = new LinkedList<>();
+		for (int i = 0; i < rnd.nextInt(200); i++) {
+			list7.add(rnd.nextLong());
+		}
+
+		final List<Long> list8 = new ArrayList<>();
+		for (int i = 0; i < rnd.nextInt(200); i++) {
+			list8.add(rnd.nextLong());
+		}
+
+		return (List<Long>[]) new List[] {
+				list1, list2, list3, list4, list5, list6, list7, list8
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
index 4e83cca..28bc812 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.migration.api.common.state;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.StateBackend;
 import org.apache.flink.api.common.state.StateDescriptor;
@@ -26,11 +26,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 /**
- * A {@link StateDescriptor} for {@link ListState}.
- *
- * @param <T> The type of the values that can be added to the list state.
+ * The old version of the {@link org.apache.flink.api.common.state.ListStateDescriptor}, retained for
+ * serialization backwards compatibility.
  */
-@PublicEvolving
+@Internal
+@Deprecated
 public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 2366342..e386e0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -108,25 +107,29 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	//  state backend operations
 	// ------------------------------------------------------------------------
 
-	@SuppressWarnings("unchecked")
 	private <N, V> StateTable<K, N, V> tryRegisterStateTable(
 			TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) {
 
-		String name = stateDesc.getName();
-		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(name);
-
-		RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
-				new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, stateDesc.getSerializer());
-
-		return tryRegisterStateTable(stateTable, newMetaInfo);
+		return tryRegisterStateTable(
+				stateDesc.getName(), stateDesc.getType(),
+				namespaceSerializer, stateDesc.getSerializer());
 	}
 
 	private <N, V> StateTable<K, N, V> tryRegisterStateTable(
-			StateTable<K, N, V> stateTable, RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
+			String stateName,
+			StateDescriptor.Type stateType,
+			TypeSerializer<N> namespaceSerializer, 
+			TypeSerializer<V> valueSerializer) {
+
+		final RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
+				new RegisteredBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer);
+
+		@SuppressWarnings("unchecked")
+		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName);
 
 		if (stateTable == null) {
 			stateTable = new StateTable<>(newMetaInfo, keyGroupRange);
-			stateTables.put(newMetaInfo.getName(), stateTable);
+			stateTables.put(stateName, stateTable);
 		} else {
 			if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
 				throw new RuntimeException("Trying to access state using incompatible meta info, was " +
@@ -151,8 +154,16 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			TypeSerializer<N> namespaceSerializer,
 			ListStateDescriptor<T> stateDesc) throws Exception {
 
+		// the list state does some manual mapping, because the state is typed to the generic
+		// 'List' interface, but we want to use an implementation typed to ArrayList
+		// using a more specialized implementation opens up runtime optimizations
+
+		StateTable<K, N, ArrayList<T>> stateTable = tryRegisterStateTable(
+				stateDesc.getName(),
+				stateDesc.getType(),
+				namespaceSerializer,
+				new ArrayListSerializer<T>(stateDesc.getElementSerializer()));
 
-		StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
 		return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
@@ -441,11 +452,6 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
 			}
 
-			// The serializer used in the list states now changes from ArrayListSerializer to ListSerializer.
-			if (stateSerializer instanceof ArrayListSerializer) {
-				stateSerializer = new ListSerializer<>(((ArrayListSerializer<?>) stateSerializer).getElementSerializer());
-			}
-
 			Map nullNameSpaceFix = (Map) rawResultMap.remove(null);
 
 			if (null != nullNameSpaceFix) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index a4e8ea7..02c3067 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -29,19 +29,18 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 /**
  * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted
  * into files.
- * 
+ *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of the value.
  */
 public class HeapListState<K, N, V>
-		extends AbstractHeapMergingState<K, N, V, Iterable<V>, List<V>, ListState<V>, ListStateDescriptor<V>>
+		extends AbstractHeapMergingState<K, N, V, Iterable<V>, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
 		implements InternalListState<N, V> {
 
 	/**
@@ -55,7 +54,7 @@ public class HeapListState<K, N, V>
 	public HeapListState(
 			KeyedStateBackend<K> backend,
 			ListStateDescriptor<V> stateDesc,
-			StateTable<K, N, List<V>> stateTable,
+			StateTable<K, N, ArrayList<V>> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
 		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
@@ -70,14 +69,14 @@ public class HeapListState<K, N, V>
 		Preconditions.checkState(currentNamespace != null, "No namespace set.");
 		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
 
-		Map<N, Map<K, List<V>>> namespaceMap =
+		Map<N, Map<K, ArrayList<V>>> namespaceMap =
 				stateTable.get(backend.getCurrentKeyGroupIndex());
 
 		if (namespaceMap == null) {
 			return null;
 		}
 
-		Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
+		Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
 
 		if (keyedMap == null) {
 			return null;
@@ -96,7 +95,7 @@ public class HeapListState<K, N, V>
 			return;
 		}
 
-		Map<N, Map<K, List<V>>> namespaceMap =
+		Map<N, Map<K, ArrayList<V>>> namespaceMap =
 				stateTable.get(backend.getCurrentKeyGroupIndex());
 
 		if (namespaceMap == null) {
@@ -104,14 +103,14 @@ public class HeapListState<K, N, V>
 			stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
 		}
 
-		Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
+		Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
 
 		if (keyedMap == null) {
 			keyedMap = createNewMap();
 			namespaceMap.put(currentNamespace, keyedMap);
 		}
 
-		List<V> list = keyedMap.get(backend.<K>getCurrentKey());
+		ArrayList<V> list = keyedMap.get(backend.<K>getCurrentKey());
 
 		if (list == null) {
 			list = new ArrayList<>();
@@ -119,26 +118,26 @@ public class HeapListState<K, N, V>
 		}
 		list.add(value);
 	}
-	
+
 	@Override
 	public byte[] getSerializedValue(K key, N namespace) throws Exception {
 		Preconditions.checkState(namespace != null, "No namespace given.");
 		Preconditions.checkState(key != null, "No key given.");
 
-		Map<N, Map<K, List<V>>> namespaceMap =
+		Map<N, Map<K, ArrayList<V>>> namespaceMap =
 				stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
 
 		if (namespaceMap == null) {
 			return null;
 		}
 
-		Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
+		Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
 
 		if (keyedMap == null) {
 			return null;
 		}
 
-		List<V> result = keyedMap.get(key);
+		ArrayList<V> result = keyedMap.get(key);
 
 		if (result == null) {
 			return null;
@@ -166,8 +165,8 @@ public class HeapListState<K, N, V>
 	// ------------------------------------------------------------------------
 
 	@Override
-	protected List<V> mergeState(List<V> a, List<V> b) {
+	protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) {
 		a.addAll(b);
 		return a;
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index f1b071a..746db28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -28,10 +28,11 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalListState;
+
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 import static java.util.Arrays.asList;
@@ -94,7 +95,7 @@ public class HeapListStateTest {
 
 			// make sure all lists / maps are cleared
 
-			StateTable<String, VoidNamespace, List<Long>> stateTable =
+			StateTable<String, VoidNamespace, ArrayList<Long>> stateTable =
 					((HeapListState<String, VoidNamespace, Long>) state).stateTable;
 
 			assertTrue(stateTable.isEmpty());
@@ -214,7 +215,7 @@ public class HeapListStateTest {
 			state.setCurrentNamespace(namespace1);
 			state.clear();
 
-			StateTable<String, Integer, List<Long>> stateTable =
+			StateTable<String, Integer, ArrayList<Long>> stateTable =
 					((HeapListState<String, Integer, Long>) state).stateTable;
 
 			assertTrue(stateTable.isEmpty());