You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/15 12:21:12 UTC

[1/2] flink git commit: [FLINK-5790] [core] Use list types when ListStateDescriptor extends StateDescriptor

Repository: flink
Updated Branches:
  refs/heads/master ded7faeab -> 2045cc5f8


[FLINK-5790] [core] Use list types when ListStateDescriptor extends StateDescriptor

This closes #3305


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d47446ca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d47446ca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d47446ca

Branch: refs/heads/master
Commit: d47446cafffe0d34d89488f6eb860aa139ceb3f1
Parents: ded7fae
Author: xiaogang.sxg <xi...@alibaba-inc.com>
Authored: Tue Feb 14 13:39:30 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 15 12:21:07 2017 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBListState.java       |   4 +-
 .../api/common/state/ListStateDescriptor.java   |  35 +++--
 .../flink/api/common/state/StateDescriptor.java |  18 +--
 .../common/typeutils/base/ListSerializer.java   | 131 +++++++++++++++++++
 .../flink/api/java/typeutils/ListTypeInfo.java  | 116 ++++++++++++++++
 .../util/MigrationInstantiationUtil.java        |   2 +-
 .../common/state/ListStateDescriptorTest.java   |  25 +++-
 .../api/common/state/ListStateDescriptor.java   | 110 ++++++++++++++++
 .../runtime/state/ArrayListSerializer.java      |  10 +-
 .../state/DefaultOperatorStateBackend.java      |   6 +-
 .../state/heap/HeapKeyedStateBackend.java       |  17 +--
 .../flink/runtime/state/heap/HeapListState.java |  25 ++--
 .../runtime/state/StateBackendTestBase.java     |   2 +-
 .../runtime/state/heap/HeapListStateTest.java   |   8 +-
 ...ccumulatingProcessingTimeWindowOperator.java |   8 +-
 .../operators/StateDescriptorPassingTest.java   |  28 +++-
 .../operators/StreamingRuntimeContextTest.java  |  10 +-
 17 files changed, 476 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index e6988f7..a8b20d1 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -47,7 +47,7 @@ import java.util.List;
  * @param <V> The type of the values in the list state.
  */
 public class RocksDBListState<K, N, V>
-	extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, V>
+	extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, List<V>>
 	implements InternalListState<N, V> {
 
 	/** Serializer for the values */
@@ -72,7 +72,7 @@ public class RocksDBListState<K, N, V>
 			RocksDBKeyedStateBackend<K> backend) {
 
 		super(columnFamily, namespaceSerializer, stateDesc, backend);
-		this.valueSerializer = stateDesc.getSerializer();
+		this.valueSerializer = stateDesc.getElementSerializer();
 
 		writeOptions = new WriteOptions();
 		writeOptions.setDisableWAL(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index 6861a07..2047e24 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -21,6 +21,10 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+
+import java.util.List;
 
 /**
  * A {@link StateDescriptor} for {@link ListState}. This can be used to create a partitioned
@@ -30,8 +34,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
  * @param <T> The type of the values that can be added to the list state.
  */
 @PublicEvolving
-public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
-	private static final long serialVersionUID = 1L;
+public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T>> {
+	private static final long serialVersionUID = 2L;
 
 	/**
 	 * Creates a new {@code ListStateDescriptor} with the given name and list element type.
@@ -40,20 +44,22 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
 	 * consider using the {@link #ListStateDescriptor(String, TypeInformation)} constructor.
 	 *
 	 * @param name The (unique) name for the state.
-	 * @param typeClass The type of the values in the state.
+	 * @param elementTypeClass The type of the elements in the state.
 	 */
-	public ListStateDescriptor(String name, Class<T> typeClass) {
-		super(name, typeClass, null);
+	@SuppressWarnings("unchecked")
+	public ListStateDescriptor(String name, Class<T> elementTypeClass) {
+		super(name, new ListTypeInfo<>(elementTypeClass), null);
 	}
 
 	/**
 	 * Creates a new {@code ListStateDescriptor} with the given name and list element type.
 	 *
 	 * @param name The (unique) name for the state.
-	 * @param typeInfo The type of the values in the state.
+	 * @param elementTypeInfo The type of the elements in the state.
 	 */
-	public ListStateDescriptor(String name, TypeInformation<T> typeInfo) {
-		super(name, typeInfo, null);
+	@SuppressWarnings("unchecked")
+	public ListStateDescriptor(String name, TypeInformation<T> elementTypeInfo) {
+		super(name, new ListTypeInfo<>(elementTypeInfo), null);
 	}
 
 	/**
@@ -62,10 +68,19 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
 	 * @param name The (unique) name for the state.
 	 * @param typeSerializer The type serializer for the list values.
 	 */
+	@SuppressWarnings("unchecked")
 	public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
-		super(name, typeSerializer, null);
+		super(name, new ListSerializer<>(typeSerializer), null);
+	}
+
+	public TypeSerializer<T> getElementSerializer() {
+		if (!(serializer instanceof ListSerializer)) {
+			throw new IllegalStateException();
+		}
+
+		return ((ListSerializer<T>)serializer).getElementSerializer();
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index b901d03..bc909e6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -243,22 +243,6 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 		}
 	}
 
-	/**
-	 * This method should be called by subclasses prior to serialization. Because the TypeInformation is
-	 * not always serializable, it is 'transient' and dropped during serialization. Hence, the descriptor
-	 * needs to make sure that the serializer is created before the TypeInformation is dropped. 
-	 */
-	private void ensureSerializerCreated() {
-		if (serializer == null) {
-			if (typeInfo != null) {
-				serializer = typeInfo.createSerializer(new ExecutionConfig());
-			} else {
-				throw new IllegalStateException(
-						"Cannot initialize serializer after TypeInformation was dropped during serialization");
-			}
-		}
-	}
-
 	// ------------------------------------------------------------------------
 	//  Standard Utils
 	// ------------------------------------------------------------------------
@@ -287,7 +271,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 
 	private void writeObject(final ObjectOutputStream out) throws IOException {
 		// make sure we have a serializer before the type information gets lost
-		ensureSerializerCreated();
+		initializeSerializerUnlessSet(new ExecutionConfig());
 
 		// write all the non-transient fields
 		out.defaultWriteObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
new file mode 100644
index 0000000..a875a3b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+@SuppressWarnings("ForLoopReplaceableByForEach")
+public class ListSerializer<T> extends TypeSerializer<List<T>> {
+
+	private static final long serialVersionUID = 1119562170939152304L;
+
+	private final TypeSerializer<T> elementSerializer;
+
+	public ListSerializer(TypeSerializer<T> elementSerializer) {
+		this.elementSerializer = elementSerializer;
+	}
+
+	public TypeSerializer<T> getElementSerializer() {
+		return elementSerializer;
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<List<T>> duplicate() {
+		TypeSerializer<T> duplicateElement = elementSerializer.duplicate();
+		return duplicateElement == elementSerializer ? this : new ListSerializer<T>(duplicateElement);
+	}
+
+	@Override
+	public List<T> createInstance() {
+		return new ArrayList<>();
+	}
+
+	@Override
+	public List<T> copy(List<T> from) {
+		List<T> newList = new ArrayList<>(from.size());
+		for (int i = 0; i < from.size(); i++) {
+			newList.add(elementSerializer.copy(from.get(i)));
+		}
+		return newList;
+	}
+
+	@Override
+	public List<T> copy(List<T> from, List<T> reuse) {
+		return copy(from);
+	}
+
+	@Override
+	public int getLength() {
+		return -1; // var length
+	}
+
+	@Override
+	public void serialize(List<T> list, DataOutputView target) throws IOException {
+		final int size = list.size();
+		target.writeInt(size);
+		for (int i = 0; i < size; i++) {
+			elementSerializer.serialize(list.get(i), target);
+		}
+	}
+
+	@Override
+	public List<T> deserialize(DataInputView source) throws IOException {
+		final int size = source.readInt();
+		final List<T> list = new ArrayList<>(size);
+		for (int i = 0; i < size; i++) {
+			list.add(elementSerializer.deserialize(source));
+		}
+		return list;
+	}
+
+	@Override
+	public List<T> deserialize(List<T> reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		// copy number of elements
+		final int num = source.readInt();
+		target.writeInt(num);
+		for (int i = 0; i < num; i++) {
+			elementSerializer.copy(source, target);
+		}
+	}
+
+	// --------------------------------------------------------------------
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj == this ||
+				(obj != null && obj.getClass() == getClass() &&
+						elementSerializer.equals(((ListSerializer<?>) obj).elementSerializer));
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return true;
+	}
+
+	@Override
+	public int hashCode() {
+		return elementSerializer.hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
new file mode 100644
index 0000000..e70aaf8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+
+import java.util.List;
+
+/**
+ * A {@link TypeInformation} for the list types of the Java API.
+ *
+ * @param <T> The type of the elements in the list.
+ */
+
+
+@Public
+public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
+
+	private final TypeInformation<T> elementTypeInfo;
+
+	public ListTypeInfo(Class<T> elementTypeClass) {
+		this.elementTypeInfo = TypeExtractor.createTypeInfo(elementTypeClass);
+	}
+
+	public ListTypeInfo(TypeInformation<T> elementTypeInfo) {
+		this.elementTypeInfo = elementTypeInfo;
+	}
+
+	public TypeInformation<T> getElementTypeInfo() {
+		return elementTypeInfo;
+	}
+
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return 0;
+	}
+
+	@Override
+	public int getTotalFields() {
+		return elementTypeInfo.getTotalFields();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Class<List<T>> getTypeClass() {
+		return (Class<List<T>>)(Class<?>)List.class;
+	}
+
+	@Override
+	public boolean isKeyType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<List<T>> createSerializer(ExecutionConfig config) {
+		TypeSerializer<T> elementTypeSerializer = elementTypeInfo.createSerializer(config);
+		return new ListSerializer<>(elementTypeSerializer);
+	}
+
+	@Override
+	public String toString() {
+		return null;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ListTypeInfo) {
+			@SuppressWarnings("unchecked")
+			ListTypeInfo<T> other = (ListTypeInfo<T>) obj;
+
+			return other.canEqual(this) && elementTypeInfo.equals(other.elementTypeInfo);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return 31 * elementTypeInfo.hashCode() + 1;
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return (obj instanceof ListTypeInfo);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
index ca75cce..d175b2f 100644
--- a/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
@@ -91,4 +91,4 @@ public final class MigrationInstantiationUtil {
 		throw new IllegalAccessError();
 	}
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
index 6dc00f0..b9d9a8c 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.fs.Path;
@@ -45,13 +46,18 @@ public class ListStateDescriptorTest {
 		
 		assertEquals("testName", descr.getName());
 		assertNotNull(descr.getSerializer());
-		assertEquals(serializer, descr.getSerializer());
+		assertTrue(descr.getSerializer() instanceof ListSerializer);
+		assertNotNull(descr.getElementSerializer());
+		assertEquals(serializer, descr.getElementSerializer());
 
 		ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
 
 		assertEquals("testName", copy.getName());
 		assertNotNull(copy.getSerializer());
-		assertEquals(serializer, copy.getSerializer());
+		assertTrue(copy.getSerializer() instanceof ListSerializer);
+
+		assertNotNull(copy.getElementSerializer());
+		assertEquals(serializer, copy.getElementSerializer());
 	}
 
 	@Test
@@ -69,11 +75,14 @@ public class ListStateDescriptorTest {
 		} catch (IllegalStateException ignored) {}
 
 		descr.initializeSerializerUnlessSet(cfg);
-		
+
 		assertNotNull(descr.getSerializer());
-		assertTrue(descr.getSerializer() instanceof KryoSerializer);
+		assertTrue(descr.getSerializer() instanceof ListSerializer);
 
-		assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
+		assertNotNull(descr.getElementSerializer());
+		assertTrue(descr.getElementSerializer() instanceof KryoSerializer);
+
+		assertTrue(((KryoSerializer<?>) descr.getElementSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
 	}
 
 	@Test
@@ -85,7 +94,11 @@ public class ListStateDescriptorTest {
 		ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
 
 		assertEquals("testName", copy.getName());
+
 		assertNotNull(copy.getSerializer());
-		assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
+		assertTrue(copy.getSerializer() instanceof ListSerializer);
+
+		assertNotNull(copy.getElementSerializer());
+		assertEquals(StringSerializer.INSTANCE, copy.getElementSerializer());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
new file mode 100644
index 0000000..4e83cca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.StateBackend;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * A {@link StateDescriptor} for {@link ListState}.
+ *
+ * @param <T> The type of the values that can be added to the list state.
+ */
+@PublicEvolving
+public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Creates a new {@code ListStateDescriptor} with the given name and list element type.
+	 *
+	 * <p>If this constructor fails (because it is not possible to describe the type via a class),
+	 * consider using the {@link #ListStateDescriptor(String, TypeInformation)} constructor.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param typeClass The type of the values in the state.
+	 */
+	public ListStateDescriptor(String name, Class<T> typeClass) {
+		super(name, typeClass, null);
+	}
+
+	/**
+	 * Creates a new {@code ListStateDescriptor} with the given name and list element type.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param typeInfo The type of the values in the state.
+	 */
+	public ListStateDescriptor(String name, TypeInformation<T> typeInfo) {
+		super(name, typeInfo, null);
+	}
+
+	/**
+	 * Creates a new {@code ListStateDescriptor} with the given name and list element type.
+	 *
+	 * @param name The (unique) name for the state.
+	 * @param typeSerializer The type serializer for the list values.
+	 */
+	public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
+		super(name, typeSerializer, null);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public ListState<T> bind(StateBackend stateBackend) throws Exception {
+		throw new IllegalStateException("Cannot bind states with a legacy state descriptor.");
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
+
+		return serializer.equals(that.serializer) && name.equals(that.name);
+
+	}
+
+	@Override
+	public int hashCode() {
+		int result = serializer.hashCode();
+		result = 31 * result + name.hashCode();
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "ListStateDescriptor{" +
+				"serializer=" + serializer +
+				'}';
+	}
+
+	@Override
+	public org.apache.flink.api.common.state.StateDescriptor.Type getType() {
+		return org.apache.flink.api.common.state.StateDescriptor.Type.LIST;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 43e6786..f5a6405 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -35,6 +35,10 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
 		this.elementSerializer = elementSerializer;
 	}
 
+	public TypeSerializer<T> getElementSerializer() {
+		return elementSerializer;
+	}
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
@@ -109,8 +113,8 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
 	@Override
 	public boolean equals(Object obj) {
 		return obj == this ||
-			(obj != null && obj.getClass() == getClass() &&
-				elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer));
+				(obj != null && obj.getClass() == getClass() &&
+						elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer));
 	}
 
 	@Override
@@ -122,4 +126,4 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
 	public int hashCode() {
 		return elementSerializer.hashCode();
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 1cd1da7..adf0727 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -106,7 +106,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 		Preconditions.checkNotNull(stateDescriptor);
 
 		String name = Preconditions.checkNotNull(stateDescriptor.getName());
-		TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getSerializer());
+		TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
 
 		@SuppressWarnings("unchecked")
 		PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredStates.get(name);
@@ -126,8 +126,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 							partitionableListState.getAssignmentMode());
 			Preconditions.checkState(
 					partitionableListState.getPartitionStateSerializer().
-							isCompatibleWith(stateDescriptor.getSerializer()),
-					"Incompatible type serializers. Provided: " + stateDescriptor.getSerializer() +
+							isCompatibleWith(stateDescriptor.getElementSerializer()),
+					"Incompatible type serializers. Provided: " + stateDescriptor.getElementSerializer() +
 							", found: " + partitionableListState.getPartitionStateSerializer());
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 0fe92e7..2366342 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.flink.annotation.VisibleForTesting;
-
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -28,6 +27,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -60,7 +60,6 @@ import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -152,15 +151,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			TypeSerializer<N> namespaceSerializer,
 			ListStateDescriptor<T> stateDesc) throws Exception {
 
-		String name = stateDesc.getName();
-
-		@SuppressWarnings("unchecked")
-		StateTable<K, N, ArrayList<T>> stateTable = (StateTable<K, N, ArrayList<T>>) stateTables.get(name);
 
-		RegisteredBackendStateMetaInfo<N, ArrayList<T>> newMetaInfo =
-				new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer()));
-
-		stateTable = tryRegisterStateTable(stateTable, newMetaInfo);
+		StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
 		return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
@@ -449,6 +441,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
 			}
 
+			// The serializer used in the list states now changes from ArrayListSerializer to ListSerializer.
+			if (stateSerializer instanceof ArrayListSerializer) {
+				stateSerializer = new ListSerializer<>(((ArrayListSerializer<?>) stateSerializer).getElementSerializer());
+			}
+
 			Map nullNameSpaceFix = (Map) rawResultMap.remove(null);
 
 			if (null != nullNameSpaceFix) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index f0eb53e..a4e8ea7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -40,7 +41,7 @@ import java.util.Map;
  * @param <V> The type of the value.
  */
 public class HeapListState<K, N, V>
-		extends AbstractHeapMergingState<K, N, V, Iterable<V>, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
+		extends AbstractHeapMergingState<K, N, V, Iterable<V>, List<V>, ListState<V>, ListStateDescriptor<V>>
 		implements InternalListState<N, V> {
 
 	/**
@@ -54,7 +55,7 @@ public class HeapListState<K, N, V>
 	public HeapListState(
 			KeyedStateBackend<K> backend,
 			ListStateDescriptor<V> stateDesc,
-			StateTable<K, N, ArrayList<V>> stateTable,
+			StateTable<K, N, List<V>> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
 		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
@@ -69,14 +70,14 @@ public class HeapListState<K, N, V>
 		Preconditions.checkState(currentNamespace != null, "No namespace set.");
 		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
 
-		Map<N, Map<K, ArrayList<V>>> namespaceMap =
+		Map<N, Map<K, List<V>>> namespaceMap =
 				stateTable.get(backend.getCurrentKeyGroupIndex());
 
 		if (namespaceMap == null) {
 			return null;
 		}
 
-		Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
+		Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
 
 		if (keyedMap == null) {
 			return null;
@@ -95,7 +96,7 @@ public class HeapListState<K, N, V>
 			return;
 		}
 
-		Map<N, Map<K, ArrayList<V>>> namespaceMap =
+		Map<N, Map<K, List<V>>> namespaceMap =
 				stateTable.get(backend.getCurrentKeyGroupIndex());
 
 		if (namespaceMap == null) {
@@ -103,14 +104,14 @@ public class HeapListState<K, N, V>
 			stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
 		}
 
-		Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
+		Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
 
 		if (keyedMap == null) {
 			keyedMap = createNewMap();
 			namespaceMap.put(currentNamespace, keyedMap);
 		}
 
-		ArrayList<V> list = keyedMap.get(backend.<K>getCurrentKey());
+		List<V> list = keyedMap.get(backend.<K>getCurrentKey());
 
 		if (list == null) {
 			list = new ArrayList<>();
@@ -124,26 +125,26 @@ public class HeapListState<K, N, V>
 		Preconditions.checkState(namespace != null, "No namespace given.");
 		Preconditions.checkState(key != null, "No key given.");
 
-		Map<N, Map<K, ArrayList<V>>> namespaceMap =
+		Map<N, Map<K, List<V>>> namespaceMap =
 				stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
 
 		if (namespaceMap == null) {
 			return null;
 		}
 
-		Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
+		Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
 
 		if (keyedMap == null) {
 			return null;
 		}
 
-		ArrayList<V> result = keyedMap.get(key);
+		List<V> result = keyedMap.get(key);
 
 		if (result == null) {
 			return null;
 		}
 
-		TypeSerializer<V> serializer = stateDesc.getSerializer();
+		TypeSerializer<V> serializer = stateDesc.getElementSerializer();
 
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);
@@ -165,7 +166,7 @@ public class HeapListState<K, N, V>
 	// ------------------------------------------------------------------------
 
 	@Override
-	protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) {
+	protected List<V> mergeState(List<V> a, List<V> b) {
 		a.addAll(b);
 		return a;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index b4bf664..7737ecf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -494,7 +494,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 			TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
 			TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
-			TypeSerializer<String> valueSerializer = kvId.getSerializer();
+			TypeSerializer<String> valueSerializer = kvId.getElementSerializer();
 
 			ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 			@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index 33d60a0..f1b071a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.api.common.ExecutionConfig;
-
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -29,11 +28,10 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalListState;
-
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import static java.util.Arrays.asList;
@@ -96,7 +94,7 @@ public class HeapListStateTest {
 
 			// make sure all lists / maps are cleared
 
-			StateTable<String, VoidNamespace, ArrayList<Long>> stateTable =
+			StateTable<String, VoidNamespace, List<Long>> stateTable =
 					((HeapListState<String, VoidNamespace, Long>) state).stateTable;
 
 			assertTrue(stateTable.isEmpty());
@@ -216,7 +214,7 @@ public class HeapListStateTest {
 			state.setCurrentNamespace(namespace1);
 			state.clear();
 
-			StateTable<String, Integer, ArrayList<Long>> stateTable = 
+			StateTable<String, Integer, List<Long>> stateTable =
 					((HeapListState<String, Integer, Long>) state).stateTable;
 
 			assertTrue(stateTable.isEmpty());

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 90e4b52..7adaf13 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -31,12 +31,12 @@ import java.util.ArrayList;
 
 @Internal
 @Deprecated
-public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
+public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
 		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
 
 	private static final long serialVersionUID = 7305948082830843475L;
 
-	
+
 	public AccumulatingProcessingTimeWindowOperator(
 			WindowFunction<IN, OUT, KEY, TimeWindow> function,
 			KeySelector<IN, KEY> keySelector,
@@ -53,7 +53,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
 	protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
 		@SuppressWarnings("unchecked")
 		WindowFunction<IN, OUT, KEY, Window> windowFunction = (WindowFunction<IN, OUT, KEY, Window>) function;
-		
+
 		return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
index c0ca6a0..26cb7ac 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -23,8 +23,10 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
 
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -130,7 +132,7 @@ public class StateDescriptorPassingTest {
 										Iterable<File> input, Collector<String> out) {}
 				});
 
-		validateStateDescriptorConfigured(result);
+		validateListStateDescriptorConfigured(result);
 	}
 
 	@Test
@@ -190,7 +192,7 @@ public class StateDescriptorPassingTest {
 					public void apply(TimeWindow window, Iterable<File> input, Collector<String> out) {}
 				});
 
-		validateStateDescriptorConfigured(result);
+		validateListStateDescriptorConfigured(result);
 	}
 
 	// ------------------------------------------------------------------------
@@ -211,4 +213,26 @@ public class StateDescriptorPassingTest {
 		assertTrue("serializer registration was not properly passed on", 
 				kryo.getSerializer(File.class) instanceof JavaSerializer);
 	}
+
+	private void validateListStateDescriptorConfigured(SingleOutputStreamOperator<?> result) {
+		OneInputTransformation<?, ?> transform = (OneInputTransformation<?, ?>) result.getTransformation();
+		WindowOperator<?, ?, ?, ?, ?> op = (WindowOperator<?, ?, ?, ?, ?>) transform.getOperator();
+		StateDescriptor<?, ?> descr = op.getStateDescriptor();
+
+		assertTrue(descr instanceof ListStateDescriptor);
+
+		ListStateDescriptor<?> listDescr = (ListStateDescriptor<?>)descr;
+
+		// this would be the first statement to fail if state descriptors were not properly initialized
+		TypeSerializer<?> serializer = listDescr.getSerializer();
+		assertTrue(serializer instanceof ListSerializer);
+
+		TypeSerializer<?> elementSerializer = listDescr.getElementSerializer();
+		assertTrue(elementSerializer instanceof KryoSerializer);
+
+		Kryo kryo = ((KryoSerializer<?>) elementSerializer).getKryo();
+
+		assertTrue("serializer registration was not properly passed on",
+				kryo.getSerializer(File.class) instanceof JavaSerializer);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 2791726..294b8da 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.fs.Path;
@@ -162,12 +163,15 @@ public class StreamingRuntimeContextTest {
 		ListStateDescriptor<TaskInfo> descr = new ListStateDescriptor<>("name", TaskInfo.class);
 		context.getListState(descr);
 
-		StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, ?>) descriptorCapture.get();
+		ListStateDescriptor<?> descrIntercepted = (ListStateDescriptor<?>) descriptorCapture.get();
 		TypeSerializer<?> serializer = descrIntercepted.getSerializer();
 
 		// check that the Path class is really registered, i.e., the execution config was applied
-		assertTrue(serializer instanceof KryoSerializer);
-		assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0);
+		assertTrue(serializer instanceof ListSerializer);
+
+		TypeSerializer<?> elementSerializer = descrIntercepted.getElementSerializer();
+		assertTrue(elementSerializer instanceof KryoSerializer);
+		assertTrue(((KryoSerializer<?>) elementSerializer).getKryo().getRegistration(Path.class).getId() > 0);
 	}
 
 	@Test


[2/2] flink git commit: [FLINK-5790] [core] Followups and tests for the StateDescriptor changes

Posted by se...@apache.org.
[FLINK-5790] [core] Followups and tests for the StateDescriptor changes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2045cc5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2045cc5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2045cc5f

Branch: refs/heads/master
Commit: 2045cc5f84ab39f18f423154c5620a79ac6d44ba
Parents: d47446c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 14 15:32:03 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 15 13:10:30 2017 +0100

----------------------------------------------------------------------
 .../api/common/state/ListStateDescriptor.java   | 45 +++++++----
 .../common/typeutils/base/ListSerializer.java   | 50 ++++++++++--
 .../flink/api/java/typeutils/ListTypeInfo.java  | 45 ++++++++---
 .../common/typeutils/SerializerTestBase.java    |  7 +-
 .../typeutils/base/ListSerializerTest.java      | 83 ++++++++++++++++++++
 .../api/common/state/ListStateDescriptor.java   | 10 +--
 .../state/heap/HeapKeyedStateBackend.java       | 40 ++++++----
 .../flink/runtime/state/heap/HeapListState.java | 29 ++++---
 .../runtime/state/heap/HeapListStateTest.java   |  7 +-
 9 files changed, 237 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index 2047e24..c03f8cb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -27,8 +27,14 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo;
 import java.util.List;
 
 /**
- * A {@link StateDescriptor} for {@link ListState}. This can be used to create a partitioned
- * list state using
+ * A {@link StateDescriptor} for {@link ListState}. This can be used to create state where the type
+ * is a list that can be appended and iterated over.
+ * 
+ * <p>Using {@code ListState} is typically more efficient than manually maintaining a list in a
+ * {@link ValueState}, because the backing implementation can support efficient appends, rathern then
+ * replacing the full list on write.
+ * 
+ * <p>To create keyed list state (on a KeyedStream), use 
  * {@link org.apache.flink.api.common.functions.RuntimeContext#getListState(ListStateDescriptor)}.
  *
  * @param <T> The type of the values that can be added to the list state.
@@ -46,7 +52,6 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 	 * @param name The (unique) name for the state.
 	 * @param elementTypeClass The type of the elements in the state.
 	 */
-	@SuppressWarnings("unchecked")
 	public ListStateDescriptor(String name, Class<T> elementTypeClass) {
 		super(name, new ListTypeInfo<>(elementTypeClass), null);
 	}
@@ -57,7 +62,6 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 	 * @param name The (unique) name for the state.
 	 * @param elementTypeInfo The type of the elements in the state.
 	 */
-	@SuppressWarnings("unchecked")
 	public ListStateDescriptor(String name, TypeInformation<T> elementTypeInfo) {
 		super(name, new ListTypeInfo<>(elementTypeInfo), null);
 	}
@@ -68,26 +72,39 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 	 * @param name The (unique) name for the state.
 	 * @param typeSerializer The type serializer for the list values.
 	 */
-	@SuppressWarnings("unchecked")
 	public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
 		super(name, new ListSerializer<>(typeSerializer), null);
 	}
 
+	// ------------------------------------------------------------------------
+
+	@Override
+	public ListState<T> bind(StateBackend stateBackend) throws Exception {
+		return stateBackend.createListState(this);
+	}
+
+	/**
+	 * Gets the serializer for the elements contained in the list.
+	 * 
+	 * @return The serializer for the elements in the list.
+	 */
 	public TypeSerializer<T> getElementSerializer() {
-		if (!(serializer instanceof ListSerializer)) {
+		// call getSerializer() here to get the initialization check and proper error message
+		final TypeSerializer<List<T>> rawSerializer = getSerializer();
+		if (!(rawSerializer instanceof ListSerializer)) {
 			throw new IllegalStateException();
 		}
 
 		return ((ListSerializer<T>)serializer).getElementSerializer();
 	}
 
-	// ------------------------------------------------------------------------
-
 	@Override
-	public ListState<T> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createListState(this);
+	public Type getType() {
+		return Type.LIST;
 	}
 
+	// ------------------------------------------------------------------------
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -97,8 +114,7 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 			return false;
 		}
 
-		ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
-
+		final ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
 		return serializer.equals(that.serializer) && name.equals(that.name);
 
 	}
@@ -116,9 +132,4 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 				"serializer=" + serializer +
 				'}';
 	}
-
-	@Override
-	public Type getType() {
-		return Type.LIST;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index a875a3b..ca3c143 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -26,21 +26,49 @@ import java.io.IOException;
 import java.util.List;
 import java.util.ArrayList;
 
-@SuppressWarnings("ForLoopReplaceableByForEach")
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializer for {@link List Lists}. The serializer relies on an element serializer
+ * for teh serialization of the list's elements.
+ * 
+ * <p>The serialization format for the list is as follows: four bytes for the length of the lost,
+ * followed by the serialized representation of each element.
+ * 
+ * @param <T> The type of element in the list.
+ */
 public class ListSerializer<T> extends TypeSerializer<List<T>> {
 
 	private static final long serialVersionUID = 1119562170939152304L;
 
+	/** The serializer for the elements of the list */
 	private final TypeSerializer<T> elementSerializer;
 
+	/**
+	 * Creates a list serializer that uses the given serializer to serialize the list's elements.
+	 * 
+	 * @param elementSerializer The serializer for the elements of the list
+	 */
 	public ListSerializer(TypeSerializer<T> elementSerializer) {
-		this.elementSerializer = elementSerializer;
+		this.elementSerializer = checkNotNull(elementSerializer);
 	}
 
+	// ------------------------------------------------------------------------
+	//  ListSerializer specific properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the serializer for the elements of the list.
+	 * @return The serializer for the elements of the list
+	 */
 	public TypeSerializer<T> getElementSerializer() {
 		return elementSerializer;
 	}
 
+	// ------------------------------------------------------------------------
+	//  Type Serializer implementation
+	// ------------------------------------------------------------------------
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
@@ -54,14 +82,18 @@ public class ListSerializer<T> extends TypeSerializer<List<T>> {
 
 	@Override
 	public List<T> createInstance() {
-		return new ArrayList<>();
+		return new ArrayList<>(0);
 	}
 
 	@Override
 	public List<T> copy(List<T> from) {
 		List<T> newList = new ArrayList<>(from.size());
-		for (int i = 0; i < from.size(); i++) {
-			newList.add(elementSerializer.copy(from.get(i)));
+
+		// We iterate here rather than accessing by index, because we cannot be sure that
+		// the given list supports RandomAccess.
+		// The Iterator should be stack allocated on new JVMs (due to escape analysis)
+		for (T element : from) {
+			newList.add(elementSerializer.copy(element));
 		}
 		return newList;
 	}
@@ -80,8 +112,12 @@ public class ListSerializer<T> extends TypeSerializer<List<T>> {
 	public void serialize(List<T> list, DataOutputView target) throws IOException {
 		final int size = list.size();
 		target.writeInt(size);
-		for (int i = 0; i < size; i++) {
-			elementSerializer.serialize(list.get(i), target);
+
+		// We iterate here rather than accessing by index, because we cannot be sure that
+		// the given list supports RandomAccess.
+		// The Iterator should be stack allocated on new JVMs (due to escape analysis)
+		for (T element : list) {
+			elementSerializer.serialize(element, target);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
index e70aaf8..763be98 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -26,30 +26,44 @@ import org.apache.flink.api.common.typeutils.base.ListSerializer;
 
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link TypeInformation} for the list types of the Java API.
  *
  * @param <T> The type of the elements in the list.
  */
-
-
-@Public
+@PublicEvolving
 public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
 
+	private static final long serialVersionUID = 1L;
+
 	private final TypeInformation<T> elementTypeInfo;
 
+
 	public ListTypeInfo(Class<T> elementTypeClass) {
-		this.elementTypeInfo = TypeExtractor.createTypeInfo(elementTypeClass);
+		this.elementTypeInfo = of(checkNotNull(elementTypeClass, "elementTypeClass"));
 	}
 
 	public ListTypeInfo(TypeInformation<T> elementTypeInfo) {
-		this.elementTypeInfo = elementTypeInfo;
+		this.elementTypeInfo = checkNotNull(elementTypeInfo, "elementTypeInfo");
 	}
 
+	// ------------------------------------------------------------------------
+	//  ListTypeInfo specific properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the type information for the elements contained in the list
+	 */
 	public TypeInformation<T> getElementTypeInfo() {
 		return elementTypeInfo;
 	}
 
+	// ------------------------------------------------------------------------
+	//  TypeInformation implementation
+	// ------------------------------------------------------------------------
+
 	@Override
 	public boolean isBasicType() {
 		return false;
@@ -67,7 +81,9 @@ public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
 
 	@Override
 	public int getTotalFields() {
-		return elementTypeInfo.getTotalFields();
+		// similar as arrays, the lists are "opaque" to the direct field addressing logic
+		// since the list's elements are not addressable, we do not expose them
+		return 1;
 	}
 
 	@SuppressWarnings("unchecked")
@@ -87,17 +103,20 @@ public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
 		return new ListSerializer<>(elementTypeSerializer);
 	}
 
+	// ------------------------------------------------------------------------
+
 	@Override
 	public String toString() {
-		return null;
+		return "List<" + elementTypeInfo + '>';
 	}
 
 	@Override
 	public boolean equals(Object obj) {
-		if (obj instanceof ListTypeInfo) {
-			@SuppressWarnings("unchecked")
-			ListTypeInfo<T> other = (ListTypeInfo<T>) obj;
-
+		if (obj == this) {
+			return true;
+		}
+		else if (obj instanceof ListTypeInfo) {
+			final ListTypeInfo<?> other = (ListTypeInfo<?>) obj;
 			return other.canEqual(this) && elementTypeInfo.equals(other.elementTypeInfo);
 		} else {
 			return false;
@@ -111,6 +130,6 @@ public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
 
 	@Override
 	public boolean canEqual(Object obj) {
-		return (obj instanceof ListTypeInfo);
+		return obj != null && obj.getClass() == getClass();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index ea91e56..91c6145 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -73,8 +73,11 @@ public abstract class SerializerTestBase<T> extends TestLogger {
 			
 			Class<T> type = getTypeClass();
 			assertNotNull("The test is corrupt: type class is null.", type);
-			
-			assertEquals("Type of the instantiated object is wrong.", type, instance.getClass());
+
+			if (!type.isAssignableFrom(instance.getClass())) {
+				fail("Type of the instantiated object is wrong. " +
+						"Expected Type: " + type + " present type " + instance.getClass());
+			}
 		}
 		catch (Exception e) {
 			System.err.println(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
new file mode 100644
index 0000000..28cdc13
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A test for the {@link LongSerializer}.
+ */
+public class ListSerializerTest extends SerializerTestBase<List<Long>> {
+
+	@Override
+	protected TypeSerializer<List<Long>> createSerializer() {
+		return new ListSerializer<>(LongSerializer.INSTANCE);
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected Class<List<Long>> getTypeClass() {
+		return (Class<List<Long>>) (Class<?>) List.class;
+	}
+
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	@Override
+	protected List<Long>[] getTestData() {
+		final Random rnd = new Random(123654789);
+
+		// empty lists
+		final List<Long> list1 = Collections.emptyList(); 
+		final List<Long> list2 = new LinkedList<>();
+		final List<Long> list3 = new ArrayList<>();
+
+		// single element lists
+		final List<Long> list4 = Collections.singletonList(55L);
+		final List<Long> list5 = new LinkedList<>();
+		list5.add(12345L);
+		final List<Long> list6 = new ArrayList<>();
+		list6.add(777888L);
+
+		// longer lists
+		final List<Long> list7 = new LinkedList<>();
+		for (int i = 0; i < rnd.nextInt(200); i++) {
+			list7.add(rnd.nextLong());
+		}
+
+		final List<Long> list8 = new ArrayList<>();
+		for (int i = 0; i < rnd.nextInt(200); i++) {
+			list8.add(rnd.nextLong());
+		}
+
+		return (List<Long>[]) new List[] {
+				list1, list2, list3, list4, list5, list6, list7, list8
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
index 4e83cca..28bc812 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.migration.api.common.state;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.StateBackend;
 import org.apache.flink.api.common.state.StateDescriptor;
@@ -26,11 +26,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
 /**
- * A {@link StateDescriptor} for {@link ListState}.
- *
- * @param <T> The type of the values that can be added to the list state.
+ * The old version of the {@link org.apache.flink.api.common.state.ListStateDescriptor}, retained for
+ * serialization backwards compatibility.
  */
-@PublicEvolving
+@Internal
+@Deprecated
 public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 2366342..e386e0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -108,25 +107,29 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	//  state backend operations
 	// ------------------------------------------------------------------------
 
-	@SuppressWarnings("unchecked")
 	private <N, V> StateTable<K, N, V> tryRegisterStateTable(
 			TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) {
 
-		String name = stateDesc.getName();
-		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(name);
-
-		RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
-				new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, stateDesc.getSerializer());
-
-		return tryRegisterStateTable(stateTable, newMetaInfo);
+		return tryRegisterStateTable(
+				stateDesc.getName(), stateDesc.getType(),
+				namespaceSerializer, stateDesc.getSerializer());
 	}
 
 	private <N, V> StateTable<K, N, V> tryRegisterStateTable(
-			StateTable<K, N, V> stateTable, RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
+			String stateName,
+			StateDescriptor.Type stateType,
+			TypeSerializer<N> namespaceSerializer, 
+			TypeSerializer<V> valueSerializer) {
+
+		final RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
+				new RegisteredBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer);
+
+		@SuppressWarnings("unchecked")
+		StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName);
 
 		if (stateTable == null) {
 			stateTable = new StateTable<>(newMetaInfo, keyGroupRange);
-			stateTables.put(newMetaInfo.getName(), stateTable);
+			stateTables.put(stateName, stateTable);
 		} else {
 			if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
 				throw new RuntimeException("Trying to access state using incompatible meta info, was " +
@@ -151,8 +154,16 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			TypeSerializer<N> namespaceSerializer,
 			ListStateDescriptor<T> stateDesc) throws Exception {
 
+		// the list state does some manual mapping, because the state is typed to the generic
+		// 'List' interface, but we want to use an implementation typed to ArrayList
+		// using a more specialized implementation opens up runtime optimizations
+
+		StateTable<K, N, ArrayList<T>> stateTable = tryRegisterStateTable(
+				stateDesc.getName(),
+				stateDesc.getType(),
+				namespaceSerializer,
+				new ArrayListSerializer<T>(stateDesc.getElementSerializer()));
 
-		StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
 		return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
 	}
 
@@ -441,11 +452,6 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
 			}
 
-			// The serializer used in the list states now changes from ArrayListSerializer to ListSerializer.
-			if (stateSerializer instanceof ArrayListSerializer) {
-				stateSerializer = new ListSerializer<>(((ArrayListSerializer<?>) stateSerializer).getElementSerializer());
-			}
-
 			Map nullNameSpaceFix = (Map) rawResultMap.remove(null);
 
 			if (null != nullNameSpaceFix) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index a4e8ea7..02c3067 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -29,19 +29,18 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 /**
  * Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted
  * into files.
- * 
+ *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of the value.
  */
 public class HeapListState<K, N, V>
-		extends AbstractHeapMergingState<K, N, V, Iterable<V>, List<V>, ListState<V>, ListStateDescriptor<V>>
+		extends AbstractHeapMergingState<K, N, V, Iterable<V>, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
 		implements InternalListState<N, V> {
 
 	/**
@@ -55,7 +54,7 @@ public class HeapListState<K, N, V>
 	public HeapListState(
 			KeyedStateBackend<K> backend,
 			ListStateDescriptor<V> stateDesc,
-			StateTable<K, N, List<V>> stateTable,
+			StateTable<K, N, ArrayList<V>> stateTable,
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer) {
 		super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
@@ -70,14 +69,14 @@ public class HeapListState<K, N, V>
 		Preconditions.checkState(currentNamespace != null, "No namespace set.");
 		Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
 
-		Map<N, Map<K, List<V>>> namespaceMap =
+		Map<N, Map<K, ArrayList<V>>> namespaceMap =
 				stateTable.get(backend.getCurrentKeyGroupIndex());
 
 		if (namespaceMap == null) {
 			return null;
 		}
 
-		Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
+		Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
 
 		if (keyedMap == null) {
 			return null;
@@ -96,7 +95,7 @@ public class HeapListState<K, N, V>
 			return;
 		}
 
-		Map<N, Map<K, List<V>>> namespaceMap =
+		Map<N, Map<K, ArrayList<V>>> namespaceMap =
 				stateTable.get(backend.getCurrentKeyGroupIndex());
 
 		if (namespaceMap == null) {
@@ -104,14 +103,14 @@ public class HeapListState<K, N, V>
 			stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
 		}
 
-		Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
+		Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
 
 		if (keyedMap == null) {
 			keyedMap = createNewMap();
 			namespaceMap.put(currentNamespace, keyedMap);
 		}
 
-		List<V> list = keyedMap.get(backend.<K>getCurrentKey());
+		ArrayList<V> list = keyedMap.get(backend.<K>getCurrentKey());
 
 		if (list == null) {
 			list = new ArrayList<>();
@@ -119,26 +118,26 @@ public class HeapListState<K, N, V>
 		}
 		list.add(value);
 	}
-	
+
 	@Override
 	public byte[] getSerializedValue(K key, N namespace) throws Exception {
 		Preconditions.checkState(namespace != null, "No namespace given.");
 		Preconditions.checkState(key != null, "No key given.");
 
-		Map<N, Map<K, List<V>>> namespaceMap =
+		Map<N, Map<K, ArrayList<V>>> namespaceMap =
 				stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
 
 		if (namespaceMap == null) {
 			return null;
 		}
 
-		Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
+		Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
 
 		if (keyedMap == null) {
 			return null;
 		}
 
-		List<V> result = keyedMap.get(key);
+		ArrayList<V> result = keyedMap.get(key);
 
 		if (result == null) {
 			return null;
@@ -166,8 +165,8 @@ public class HeapListState<K, N, V>
 	// ------------------------------------------------------------------------
 
 	@Override
-	protected List<V> mergeState(List<V> a, List<V> b) {
+	protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) {
 		a.addAll(b);
 		return a;
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index f1b071a..746db28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -28,10 +28,11 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalListState;
+
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 import static java.util.Arrays.asList;
@@ -94,7 +95,7 @@ public class HeapListStateTest {
 
 			// make sure all lists / maps are cleared
 
-			StateTable<String, VoidNamespace, List<Long>> stateTable =
+			StateTable<String, VoidNamespace, ArrayList<Long>> stateTable =
 					((HeapListState<String, VoidNamespace, Long>) state).stateTable;
 
 			assertTrue(stateTable.isEmpty());
@@ -214,7 +215,7 @@ public class HeapListStateTest {
 			state.setCurrentNamespace(namespace1);
 			state.clear();
 
-			StateTable<String, Integer, List<Long>> stateTable =
+			StateTable<String, Integer, ArrayList<Long>> stateTable =
 					((HeapListState<String, Integer, Long>) state).stateTable;
 
 			assertTrue(stateTable.isEmpty());