You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/15 12:21:13 UTC
[2/2] flink git commit: [FLINK-5790] [core] Followups and tests for
the StateDescriptor changes
[FLINK-5790] [core] Followups and tests for the StateDescriptor changes
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2045cc5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2045cc5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2045cc5f
Branch: refs/heads/master
Commit: 2045cc5f84ab39f18f423154c5620a79ac6d44ba
Parents: d47446c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 14 15:32:03 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 15 13:10:30 2017 +0100
----------------------------------------------------------------------
.../api/common/state/ListStateDescriptor.java | 45 +++++++----
.../common/typeutils/base/ListSerializer.java | 50 ++++++++++--
.../flink/api/java/typeutils/ListTypeInfo.java | 45 ++++++++---
.../common/typeutils/SerializerTestBase.java | 7 +-
.../typeutils/base/ListSerializerTest.java | 83 ++++++++++++++++++++
.../api/common/state/ListStateDescriptor.java | 10 +--
.../state/heap/HeapKeyedStateBackend.java | 40 ++++++----
.../flink/runtime/state/heap/HeapListState.java | 29 ++++---
.../runtime/state/heap/HeapListStateTest.java | 7 +-
9 files changed, 237 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index 2047e24..c03f8cb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -27,8 +27,14 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo;
import java.util.List;
/**
- * A {@link StateDescriptor} for {@link ListState}. This can be used to create a partitioned
- * list state using
+ * A {@link StateDescriptor} for {@link ListState}. This can be used to create state where the type
+ * is a list that can be appended and iterated over.
+ *
+ * <p>Using {@code ListState} is typically more efficient than manually maintaining a list in a
+ * {@link ValueState}, because the backing implementation can support efficient appends, rathern then
+ * replacing the full list on write.
+ *
+ * <p>To create keyed list state (on a KeyedStream), use
* {@link org.apache.flink.api.common.functions.RuntimeContext#getListState(ListStateDescriptor)}.
*
* @param <T> The type of the values that can be added to the list state.
@@ -46,7 +52,6 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
* @param name The (unique) name for the state.
* @param elementTypeClass The type of the elements in the state.
*/
- @SuppressWarnings("unchecked")
public ListStateDescriptor(String name, Class<T> elementTypeClass) {
super(name, new ListTypeInfo<>(elementTypeClass), null);
}
@@ -57,7 +62,6 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
* @param name The (unique) name for the state.
* @param elementTypeInfo The type of the elements in the state.
*/
- @SuppressWarnings("unchecked")
public ListStateDescriptor(String name, TypeInformation<T> elementTypeInfo) {
super(name, new ListTypeInfo<>(elementTypeInfo), null);
}
@@ -68,26 +72,39 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
* @param name The (unique) name for the state.
* @param typeSerializer The type serializer for the list values.
*/
- @SuppressWarnings("unchecked")
public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
super(name, new ListSerializer<>(typeSerializer), null);
}
+ // ------------------------------------------------------------------------
+
+ @Override
+ public ListState<T> bind(StateBackend stateBackend) throws Exception {
+ return stateBackend.createListState(this);
+ }
+
+ /**
+ * Gets the serializer for the elements contained in the list.
+ *
+ * @return The serializer for the elements in the list.
+ */
public TypeSerializer<T> getElementSerializer() {
- if (!(serializer instanceof ListSerializer)) {
+ // call getSerializer() here to get the initialization check and proper error message
+ final TypeSerializer<List<T>> rawSerializer = getSerializer();
+ if (!(rawSerializer instanceof ListSerializer)) {
throw new IllegalStateException();
}
return ((ListSerializer<T>)serializer).getElementSerializer();
}
- // ------------------------------------------------------------------------
-
@Override
- public ListState<T> bind(StateBackend stateBackend) throws Exception {
- return stateBackend.createListState(this);
+ public Type getType() {
+ return Type.LIST;
}
+ // ------------------------------------------------------------------------
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -97,8 +114,7 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
return false;
}
- ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
-
+ final ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
return serializer.equals(that.serializer) && name.equals(that.name);
}
@@ -116,9 +132,4 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
"serializer=" + serializer +
'}';
}
-
- @Override
- public Type getType() {
- return Type.LIST;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index a875a3b..ca3c143 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -26,21 +26,49 @@ import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
-@SuppressWarnings("ForLoopReplaceableByForEach")
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializer for {@link List Lists}. The serializer relies on an element serializer
+ * for teh serialization of the list's elements.
+ *
+ * <p>The serialization format for the list is as follows: four bytes for the length of the lost,
+ * followed by the serialized representation of each element.
+ *
+ * @param <T> The type of element in the list.
+ */
public class ListSerializer<T> extends TypeSerializer<List<T>> {
private static final long serialVersionUID = 1119562170939152304L;
+ /** The serializer for the elements of the list */
private final TypeSerializer<T> elementSerializer;
+ /**
+ * Creates a list serializer that uses the given serializer to serialize the list's elements.
+ *
+ * @param elementSerializer The serializer for the elements of the list
+ */
public ListSerializer(TypeSerializer<T> elementSerializer) {
- this.elementSerializer = elementSerializer;
+ this.elementSerializer = checkNotNull(elementSerializer);
}
+ // ------------------------------------------------------------------------
+ // ListSerializer specific properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the serializer for the elements of the list.
+ * @return The serializer for the elements of the list
+ */
public TypeSerializer<T> getElementSerializer() {
return elementSerializer;
}
+ // ------------------------------------------------------------------------
+ // Type Serializer implementation
+ // ------------------------------------------------------------------------
+
@Override
public boolean isImmutableType() {
return false;
@@ -54,14 +82,18 @@ public class ListSerializer<T> extends TypeSerializer<List<T>> {
@Override
public List<T> createInstance() {
- return new ArrayList<>();
+ return new ArrayList<>(0);
}
@Override
public List<T> copy(List<T> from) {
List<T> newList = new ArrayList<>(from.size());
- for (int i = 0; i < from.size(); i++) {
- newList.add(elementSerializer.copy(from.get(i)));
+
+ // We iterate here rather than accessing by index, because we cannot be sure that
+ // the given list supports RandomAccess.
+ // The Iterator should be stack allocated on new JVMs (due to escape analysis)
+ for (T element : from) {
+ newList.add(elementSerializer.copy(element));
}
return newList;
}
@@ -80,8 +112,12 @@ public class ListSerializer<T> extends TypeSerializer<List<T>> {
public void serialize(List<T> list, DataOutputView target) throws IOException {
final int size = list.size();
target.writeInt(size);
- for (int i = 0; i < size; i++) {
- elementSerializer.serialize(list.get(i), target);
+
+ // We iterate here rather than accessing by index, because we cannot be sure that
+ // the given list supports RandomAccess.
+ // The Iterator should be stack allocated on new JVMs (due to escape analysis)
+ for (T element : list) {
+ elementSerializer.serialize(element, target);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
index e70aaf8..763be98 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
@@ -18,7 +18,7 @@
package org.apache.flink.api.java.typeutils;
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -26,30 +26,44 @@ import org.apache.flink.api.common.typeutils.base.ListSerializer;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A {@link TypeInformation} for the list types of the Java API.
*
* @param <T> The type of the elements in the list.
*/
-
-
-@Public
+@PublicEvolving
public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
+ private static final long serialVersionUID = 1L;
+
private final TypeInformation<T> elementTypeInfo;
+
public ListTypeInfo(Class<T> elementTypeClass) {
- this.elementTypeInfo = TypeExtractor.createTypeInfo(elementTypeClass);
+ this.elementTypeInfo = of(checkNotNull(elementTypeClass, "elementTypeClass"));
}
public ListTypeInfo(TypeInformation<T> elementTypeInfo) {
- this.elementTypeInfo = elementTypeInfo;
+ this.elementTypeInfo = checkNotNull(elementTypeInfo, "elementTypeInfo");
}
+ // ------------------------------------------------------------------------
+ // ListTypeInfo specific properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the type information for the elements contained in the list
+ */
public TypeInformation<T> getElementTypeInfo() {
return elementTypeInfo;
}
+ // ------------------------------------------------------------------------
+ // TypeInformation implementation
+ // ------------------------------------------------------------------------
+
@Override
public boolean isBasicType() {
return false;
@@ -67,7 +81,9 @@ public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
@Override
public int getTotalFields() {
- return elementTypeInfo.getTotalFields();
+ // similar as arrays, the lists are "opaque" to the direct field addressing logic
+ // since the list's elements are not addressable, we do not expose them
+ return 1;
}
@SuppressWarnings("unchecked")
@@ -87,17 +103,20 @@ public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
return new ListSerializer<>(elementTypeSerializer);
}
+ // ------------------------------------------------------------------------
+
@Override
public String toString() {
- return null;
+ return "List<" + elementTypeInfo + '>';
}
@Override
public boolean equals(Object obj) {
- if (obj instanceof ListTypeInfo) {
- @SuppressWarnings("unchecked")
- ListTypeInfo<T> other = (ListTypeInfo<T>) obj;
-
+ if (obj == this) {
+ return true;
+ }
+ else if (obj instanceof ListTypeInfo) {
+ final ListTypeInfo<?> other = (ListTypeInfo<?>) obj;
return other.canEqual(this) && elementTypeInfo.equals(other.elementTypeInfo);
} else {
return false;
@@ -111,6 +130,6 @@ public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
@Override
public boolean canEqual(Object obj) {
- return (obj instanceof ListTypeInfo);
+ return obj != null && obj.getClass() == getClass();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index ea91e56..91c6145 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -73,8 +73,11 @@ public abstract class SerializerTestBase<T> extends TestLogger {
Class<T> type = getTypeClass();
assertNotNull("The test is corrupt: type class is null.", type);
-
- assertEquals("Type of the instantiated object is wrong.", type, instance.getClass());
+
+ if (!type.isAssignableFrom(instance.getClass())) {
+ fail("Type of the instantiated object is wrong. " +
+ "Expected Type: " + type + " present type " + instance.getClass());
+ }
}
catch (Exception e) {
System.err.println(e.getMessage());
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
new file mode 100644
index 0000000..28cdc13
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A test for the {@link LongSerializer}.
+ */
+public class ListSerializerTest extends SerializerTestBase<List<Long>> {
+
+ @Override
+ protected TypeSerializer<List<Long>> createSerializer() {
+ return new ListSerializer<>(LongSerializer.INSTANCE);
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Class<List<Long>> getTypeClass() {
+ return (Class<List<Long>>) (Class<?>) List.class;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ protected List<Long>[] getTestData() {
+ final Random rnd = new Random(123654789);
+
+ // empty lists
+ final List<Long> list1 = Collections.emptyList();
+ final List<Long> list2 = new LinkedList<>();
+ final List<Long> list3 = new ArrayList<>();
+
+ // single element lists
+ final List<Long> list4 = Collections.singletonList(55L);
+ final List<Long> list5 = new LinkedList<>();
+ list5.add(12345L);
+ final List<Long> list6 = new ArrayList<>();
+ list6.add(777888L);
+
+ // longer lists
+ final List<Long> list7 = new LinkedList<>();
+ for (int i = 0; i < rnd.nextInt(200); i++) {
+ list7.add(rnd.nextLong());
+ }
+
+ final List<Long> list8 = new ArrayList<>();
+ for (int i = 0; i < rnd.nextInt(200); i++) {
+ list8.add(rnd.nextLong());
+ }
+
+ return (List<Long>[]) new List[] {
+ list1, list2, list3, list4, list5, list6, list7, list8
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
index 4e83cca..28bc812 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
@@ -18,7 +18,7 @@
package org.apache.flink.migration.api.common.state;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.StateBackend;
import org.apache.flink.api.common.state.StateDescriptor;
@@ -26,11 +26,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
/**
- * A {@link StateDescriptor} for {@link ListState}.
- *
- * @param <T> The type of the values that can be added to the list state.
+ * The old version of the {@link org.apache.flink.api.common.state.ListStateDescriptor}, retained for
+ * serialization backwards compatibility.
*/
-@PublicEvolving
+@Internal
+@Deprecated
public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 2366342..e386e0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
@@ -108,25 +107,29 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// state backend operations
// ------------------------------------------------------------------------
- @SuppressWarnings("unchecked")
private <N, V> StateTable<K, N, V> tryRegisterStateTable(
TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) {
- String name = stateDesc.getName();
- StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(name);
-
- RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
- new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, stateDesc.getSerializer());
-
- return tryRegisterStateTable(stateTable, newMetaInfo);
+ return tryRegisterStateTable(
+ stateDesc.getName(), stateDesc.getType(),
+ namespaceSerializer, stateDesc.getSerializer());
}
private <N, V> StateTable<K, N, V> tryRegisterStateTable(
- StateTable<K, N, V> stateTable, RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
+ String stateName,
+ StateDescriptor.Type stateType,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<V> valueSerializer) {
+
+ final RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer);
+
+ @SuppressWarnings("unchecked")
+ StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName);
if (stateTable == null) {
stateTable = new StateTable<>(newMetaInfo, keyGroupRange);
- stateTables.put(newMetaInfo.getName(), stateTable);
+ stateTables.put(stateName, stateTable);
} else {
if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
throw new RuntimeException("Trying to access state using incompatible meta info, was " +
@@ -151,8 +154,16 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc) throws Exception {
+ // the list state does some manual mapping, because the state is typed to the generic
+ // 'List' interface, but we want to use an implementation typed to ArrayList
+ // using a more specialized implementation opens up runtime optimizations
+
+ StateTable<K, N, ArrayList<T>> stateTable = tryRegisterStateTable(
+ stateDesc.getName(),
+ stateDesc.getType(),
+ namespaceSerializer,
+ new ArrayListSerializer<T>(stateDesc.getElementSerializer()));
- StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@@ -441,11 +452,6 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
}
- // The serializer used in the list states now changes from ArrayListSerializer to ListSerializer.
- if (stateSerializer instanceof ArrayListSerializer) {
- stateSerializer = new ListSerializer<>(((ArrayListSerializer<?>) stateSerializer).getElementSerializer());
- }
-
Map nullNameSpaceFix = (Map) rawResultMap.remove(null);
if (null != nullNameSpaceFix) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index a4e8ea7..02c3067 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -29,19 +29,18 @@ import org.apache.flink.util.Preconditions;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
/**
* Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted
* into files.
- *
+ *
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <V> The type of the value.
*/
public class HeapListState<K, N, V>
- extends AbstractHeapMergingState<K, N, V, Iterable<V>, List<V>, ListState<V>, ListStateDescriptor<V>>
+ extends AbstractHeapMergingState<K, N, V, Iterable<V>, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
implements InternalListState<N, V> {
/**
@@ -55,7 +54,7 @@ public class HeapListState<K, N, V>
public HeapListState(
KeyedStateBackend<K> backend,
ListStateDescriptor<V> stateDesc,
- StateTable<K, N, List<V>> stateTable,
+ StateTable<K, N, ArrayList<V>> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
@@ -70,14 +69,14 @@ public class HeapListState<K, N, V>
Preconditions.checkState(currentNamespace != null, "No namespace set.");
Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
- Map<N, Map<K, List<V>>> namespaceMap =
+ Map<N, Map<K, ArrayList<V>>> namespaceMap =
stateTable.get(backend.getCurrentKeyGroupIndex());
if (namespaceMap == null) {
return null;
}
- Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
+ Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
if (keyedMap == null) {
return null;
@@ -96,7 +95,7 @@ public class HeapListState<K, N, V>
return;
}
- Map<N, Map<K, List<V>>> namespaceMap =
+ Map<N, Map<K, ArrayList<V>>> namespaceMap =
stateTable.get(backend.getCurrentKeyGroupIndex());
if (namespaceMap == null) {
@@ -104,14 +103,14 @@ public class HeapListState<K, N, V>
stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
}
- Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
+ Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
if (keyedMap == null) {
keyedMap = createNewMap();
namespaceMap.put(currentNamespace, keyedMap);
}
- List<V> list = keyedMap.get(backend.<K>getCurrentKey());
+ ArrayList<V> list = keyedMap.get(backend.<K>getCurrentKey());
if (list == null) {
list = new ArrayList<>();
@@ -119,26 +118,26 @@ public class HeapListState<K, N, V>
}
list.add(value);
}
-
+
@Override
public byte[] getSerializedValue(K key, N namespace) throws Exception {
Preconditions.checkState(namespace != null, "No namespace given.");
Preconditions.checkState(key != null, "No key given.");
- Map<N, Map<K, List<V>>> namespaceMap =
+ Map<N, Map<K, ArrayList<V>>> namespaceMap =
stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
if (namespaceMap == null) {
return null;
}
- Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
+ Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
if (keyedMap == null) {
return null;
}
- List<V> result = keyedMap.get(key);
+ ArrayList<V> result = keyedMap.get(key);
if (result == null) {
return null;
@@ -166,8 +165,8 @@ public class HeapListState<K, N, V>
// ------------------------------------------------------------------------
@Override
- protected List<V> mergeState(List<V> a, List<V> b) {
+ protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) {
a.addAll(b);
return a;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index f1b071a..746db28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -28,10 +28,11 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalListState;
+
import org.junit.Test;
+import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import static java.util.Arrays.asList;
@@ -94,7 +95,7 @@ public class HeapListStateTest {
// make sure all lists / maps are cleared
- StateTable<String, VoidNamespace, List<Long>> stateTable =
+ StateTable<String, VoidNamespace, ArrayList<Long>> stateTable =
((HeapListState<String, VoidNamespace, Long>) state).stateTable;
assertTrue(stateTable.isEmpty());
@@ -214,7 +215,7 @@ public class HeapListStateTest {
state.setCurrentNamespace(namespace1);
state.clear();
- StateTable<String, Integer, List<Long>> stateTable =
+ StateTable<String, Integer, ArrayList<Long>> stateTable =
((HeapListState<String, Integer, Long>) state).stateTable;
assertTrue(stateTable.isEmpty());