You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/15 12:21:12 UTC
[1/2] flink git commit: [FLINK-5790] [core] Use list types when
ListStateDescriptor extends StateDescriptor
Repository: flink
Updated Branches:
refs/heads/master ded7faeab -> 2045cc5f8
[FLINK-5790] [core] Use list types when ListStateDescriptor extends StateDescriptor
This closes #3305
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d47446ca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d47446ca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d47446ca
Branch: refs/heads/master
Commit: d47446cafffe0d34d89488f6eb860aa139ceb3f1
Parents: ded7fae
Author: xiaogang.sxg <xi...@alibaba-inc.com>
Authored: Tue Feb 14 13:39:30 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 15 12:21:07 2017 +0100
----------------------------------------------------------------------
.../streaming/state/RocksDBListState.java | 4 +-
.../api/common/state/ListStateDescriptor.java | 35 +++--
.../flink/api/common/state/StateDescriptor.java | 18 +--
.../common/typeutils/base/ListSerializer.java | 131 +++++++++++++++++++
.../flink/api/java/typeutils/ListTypeInfo.java | 116 ++++++++++++++++
.../util/MigrationInstantiationUtil.java | 2 +-
.../common/state/ListStateDescriptorTest.java | 25 +++-
.../api/common/state/ListStateDescriptor.java | 110 ++++++++++++++++
.../runtime/state/ArrayListSerializer.java | 10 +-
.../state/DefaultOperatorStateBackend.java | 6 +-
.../state/heap/HeapKeyedStateBackend.java | 17 +--
.../flink/runtime/state/heap/HeapListState.java | 25 ++--
.../runtime/state/StateBackendTestBase.java | 2 +-
.../runtime/state/heap/HeapListStateTest.java | 8 +-
...ccumulatingProcessingTimeWindowOperator.java | 8 +-
.../operators/StateDescriptorPassingTest.java | 28 +++-
.../operators/StreamingRuntimeContextTest.java | 10 +-
17 files changed, 476 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index e6988f7..a8b20d1 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -47,7 +47,7 @@ import java.util.List;
* @param <V> The type of the values in the list state.
*/
public class RocksDBListState<K, N, V>
- extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, V>
+ extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, List<V>>
implements InternalListState<N, V> {
/** Serializer for the values */
@@ -72,7 +72,7 @@ public class RocksDBListState<K, N, V>
RocksDBKeyedStateBackend<K> backend) {
super(columnFamily, namespaceSerializer, stateDesc, backend);
- this.valueSerializer = stateDesc.getSerializer();
+ this.valueSerializer = stateDesc.getElementSerializer();
writeOptions = new WriteOptions();
writeOptions.setDisableWAL(true);
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index 6861a07..2047e24 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -21,6 +21,10 @@ package org.apache.flink.api.common.state;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+
+import java.util.List;
/**
* A {@link StateDescriptor} for {@link ListState}. This can be used to create a partitioned
@@ -30,8 +34,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
* @param <T> The type of the values that can be added to the list state.
*/
@PublicEvolving
-public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
- private static final long serialVersionUID = 1L;
+public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T>> {
+ private static final long serialVersionUID = 2L;
/**
* Creates a new {@code ListStateDescriptor} with the given name and list element type.
@@ -40,20 +44,22 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
* consider using the {@link #ListStateDescriptor(String, TypeInformation)} constructor.
*
* @param name The (unique) name for the state.
- * @param typeClass The type of the values in the state.
+ * @param elementTypeClass The type of the elements in the state.
*/
- public ListStateDescriptor(String name, Class<T> typeClass) {
- super(name, typeClass, null);
+ @SuppressWarnings("unchecked")
+ public ListStateDescriptor(String name, Class<T> elementTypeClass) {
+ super(name, new ListTypeInfo<>(elementTypeClass), null);
}
/**
* Creates a new {@code ListStateDescriptor} with the given name and list element type.
*
* @param name The (unique) name for the state.
- * @param typeInfo The type of the values in the state.
+ * @param elementTypeInfo The type of the elements in the state.
*/
- public ListStateDescriptor(String name, TypeInformation<T> typeInfo) {
- super(name, typeInfo, null);
+ @SuppressWarnings("unchecked")
+ public ListStateDescriptor(String name, TypeInformation<T> elementTypeInfo) {
+ super(name, new ListTypeInfo<>(elementTypeInfo), null);
}
/**
@@ -62,10 +68,19 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
* @param name The (unique) name for the state.
* @param typeSerializer The type serializer for the list values.
*/
+ @SuppressWarnings("unchecked")
public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
- super(name, typeSerializer, null);
+ super(name, new ListSerializer<>(typeSerializer), null);
+ }
+
+ public TypeSerializer<T> getElementSerializer() {
+ if (!(serializer instanceof ListSerializer)) {
+ throw new IllegalStateException();
+ }
+
+ return ((ListSerializer<T>)serializer).getElementSerializer();
}
-
+
// ------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index b901d03..bc909e6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -243,22 +243,6 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
}
}
- /**
- * This method should be called by subclasses prior to serialization. Because the TypeInformation is
- * not always serializable, it is 'transient' and dropped during serialization. Hence, the descriptor
- * needs to make sure that the serializer is created before the TypeInformation is dropped.
- */
- private void ensureSerializerCreated() {
- if (serializer == null) {
- if (typeInfo != null) {
- serializer = typeInfo.createSerializer(new ExecutionConfig());
- } else {
- throw new IllegalStateException(
- "Cannot initialize serializer after TypeInformation was dropped during serialization");
- }
- }
- }
-
// ------------------------------------------------------------------------
// Standard Utils
// ------------------------------------------------------------------------
@@ -287,7 +271,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
private void writeObject(final ObjectOutputStream out) throws IOException {
// make sure we have a serializer before the type information gets lost
- ensureSerializerCreated();
+ initializeSerializerUnlessSet(new ExecutionConfig());
// write all the non-transient fields
out.defaultWriteObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
new file mode 100644
index 0000000..a875a3b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+@SuppressWarnings("ForLoopReplaceableByForEach")
+public class ListSerializer<T> extends TypeSerializer<List<T>> {
+
+ private static final long serialVersionUID = 1119562170939152304L;
+
+ private final TypeSerializer<T> elementSerializer;
+
+ public ListSerializer(TypeSerializer<T> elementSerializer) {
+ this.elementSerializer = elementSerializer;
+ }
+
+ public TypeSerializer<T> getElementSerializer() {
+ return elementSerializer;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<List<T>> duplicate() {
+ TypeSerializer<T> duplicateElement = elementSerializer.duplicate();
+ return duplicateElement == elementSerializer ? this : new ListSerializer<T>(duplicateElement);
+ }
+
+ @Override
+ public List<T> createInstance() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<T> copy(List<T> from) {
+ List<T> newList = new ArrayList<>(from.size());
+ for (int i = 0; i < from.size(); i++) {
+ newList.add(elementSerializer.copy(from.get(i)));
+ }
+ return newList;
+ }
+
+ @Override
+ public List<T> copy(List<T> from, List<T> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1; // var length
+ }
+
+ @Override
+ public void serialize(List<T> list, DataOutputView target) throws IOException {
+ final int size = list.size();
+ target.writeInt(size);
+ for (int i = 0; i < size; i++) {
+ elementSerializer.serialize(list.get(i), target);
+ }
+ }
+
+ @Override
+ public List<T> deserialize(DataInputView source) throws IOException {
+ final int size = source.readInt();
+ final List<T> list = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ list.add(elementSerializer.deserialize(source));
+ }
+ return list;
+ }
+
+ @Override
+ public List<T> deserialize(List<T> reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ // copy number of elements
+ final int num = source.readInt();
+ target.writeInt(num);
+ for (int i = 0; i < num; i++) {
+ elementSerializer.copy(source, target);
+ }
+ }
+
+ // --------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this ||
+ (obj != null && obj.getClass() == getClass() &&
+ elementSerializer.equals(((ListSerializer<?>) obj).elementSerializer));
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return elementSerializer.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
new file mode 100644
index 0000000..e70aaf8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+
+import java.util.List;
+
+/**
+ * A {@link TypeInformation} for the list types of the Java API.
+ *
+ * @param <T> The type of the elements in the list.
+ */
+
+
+@Public
+public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
+
+ private final TypeInformation<T> elementTypeInfo;
+
+ public ListTypeInfo(Class<T> elementTypeClass) {
+ this.elementTypeInfo = TypeExtractor.createTypeInfo(elementTypeClass);
+ }
+
+ public ListTypeInfo(TypeInformation<T> elementTypeInfo) {
+ this.elementTypeInfo = elementTypeInfo;
+ }
+
+ public TypeInformation<T> getElementTypeInfo() {
+ return elementTypeInfo;
+ }
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 0;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return elementTypeInfo.getTotalFields();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class<List<T>> getTypeClass() {
+ return (Class<List<T>>)(Class<?>)List.class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<List<T>> createSerializer(ExecutionConfig config) {
+ TypeSerializer<T> elementTypeSerializer = elementTypeInfo.createSerializer(config);
+ return new ListSerializer<>(elementTypeSerializer);
+ }
+
+ @Override
+ public String toString() {
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ListTypeInfo) {
+ @SuppressWarnings("unchecked")
+ ListTypeInfo<T> other = (ListTypeInfo<T>) obj;
+
+ return other.canEqual(this) && elementTypeInfo.equals(other.elementTypeInfo);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * elementTypeInfo.hashCode() + 1;
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return (obj instanceof ListTypeInfo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
index ca75cce..d175b2f 100644
--- a/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/migration/util/MigrationInstantiationUtil.java
@@ -91,4 +91,4 @@ public final class MigrationInstantiationUtil {
throw new IllegalAccessError();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
index 6dc00f0..b9d9a8c 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.Path;
@@ -45,13 +46,18 @@ public class ListStateDescriptorTest {
assertEquals("testName", descr.getName());
assertNotNull(descr.getSerializer());
- assertEquals(serializer, descr.getSerializer());
+ assertTrue(descr.getSerializer() instanceof ListSerializer);
+ assertNotNull(descr.getElementSerializer());
+ assertEquals(serializer, descr.getElementSerializer());
ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
assertEquals("testName", copy.getName());
assertNotNull(copy.getSerializer());
- assertEquals(serializer, copy.getSerializer());
+ assertTrue(copy.getSerializer() instanceof ListSerializer);
+
+ assertNotNull(copy.getElementSerializer());
+ assertEquals(serializer, copy.getElementSerializer());
}
@Test
@@ -69,11 +75,14 @@ public class ListStateDescriptorTest {
} catch (IllegalStateException ignored) {}
descr.initializeSerializerUnlessSet(cfg);
-
+
assertNotNull(descr.getSerializer());
- assertTrue(descr.getSerializer() instanceof KryoSerializer);
+ assertTrue(descr.getSerializer() instanceof ListSerializer);
- assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
+ assertNotNull(descr.getElementSerializer());
+ assertTrue(descr.getElementSerializer() instanceof KryoSerializer);
+
+ assertTrue(((KryoSerializer<?>) descr.getElementSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
}
@Test
@@ -85,7 +94,11 @@ public class ListStateDescriptorTest {
ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
assertEquals("testName", copy.getName());
+
assertNotNull(copy.getSerializer());
- assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
+ assertTrue(copy.getSerializer() instanceof ListSerializer);
+
+ assertNotNull(copy.getElementSerializer());
+ assertEquals(StringSerializer.INSTANCE, copy.getElementSerializer());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
new file mode 100644
index 0000000..4e83cca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.StateBackend;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * A {@link StateDescriptor} for {@link ListState}.
+ *
+ * @param <T> The type of the values that can be added to the list state.
+ */
+@PublicEvolving
+public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Creates a new {@code ListStateDescriptor} with the given name and list element type.
+ *
+ * <p>If this constructor fails (because it is not possible to describe the type via a class),
+ * consider using the {@link #ListStateDescriptor(String, TypeInformation)} constructor.
+ *
+ * @param name The (unique) name for the state.
+ * @param typeClass The type of the values in the state.
+ */
+ public ListStateDescriptor(String name, Class<T> typeClass) {
+ super(name, typeClass, null);
+ }
+
+ /**
+ * Creates a new {@code ListStateDescriptor} with the given name and list element type.
+ *
+ * @param name The (unique) name for the state.
+ * @param typeInfo The type of the values in the state.
+ */
+ public ListStateDescriptor(String name, TypeInformation<T> typeInfo) {
+ super(name, typeInfo, null);
+ }
+
+ /**
+ * Creates a new {@code ListStateDescriptor} with the given name and list element type.
+ *
+ * @param name The (unique) name for the state.
+ * @param typeSerializer The type serializer for the list values.
+ */
+ public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
+ super(name, typeSerializer, null);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public ListState<T> bind(StateBackend stateBackend) throws Exception {
+ throw new IllegalStateException("Cannot bind states with a legacy state descriptor.");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
+
+ return serializer.equals(that.serializer) && name.equals(that.name);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = serializer.hashCode();
+ result = 31 * result + name.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "ListStateDescriptor{" +
+ "serializer=" + serializer +
+ '}';
+ }
+
+ @Override
+ public org.apache.flink.api.common.state.StateDescriptor.Type getType() {
+ return org.apache.flink.api.common.state.StateDescriptor.Type.LIST;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 43e6786..f5a6405 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -35,6 +35,10 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
this.elementSerializer = elementSerializer;
}
+ public TypeSerializer<T> getElementSerializer() {
+ return elementSerializer;
+ }
+
@Override
public boolean isImmutableType() {
return false;
@@ -109,8 +113,8 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
@Override
public boolean equals(Object obj) {
return obj == this ||
- (obj != null && obj.getClass() == getClass() &&
- elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer));
+ (obj != null && obj.getClass() == getClass() &&
+ elementSerializer.equals(((ArrayListSerializer<?>) obj).elementSerializer));
}
@Override
@@ -122,4 +126,4 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> {
public int hashCode() {
return elementSerializer.hashCode();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 1cd1da7..adf0727 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -106,7 +106,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
Preconditions.checkNotNull(stateDescriptor);
String name = Preconditions.checkNotNull(stateDescriptor.getName());
- TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getSerializer());
+ TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
@SuppressWarnings("unchecked")
PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredStates.get(name);
@@ -126,8 +126,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
partitionableListState.getAssignmentMode());
Preconditions.checkState(
partitionableListState.getPartitionStateSerializer().
- isCompatibleWith(stateDescriptor.getSerializer()),
- "Incompatible type serializers. Provided: " + stateDescriptor.getSerializer() +
+ isCompatibleWith(stateDescriptor.getElementSerializer()),
+ "Incompatible type serializers. Provided: " + stateDescriptor.getElementSerializer() +
", found: " + partitionableListState.getPartitionStateSerializer());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 0fe92e7..2366342 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
-
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -28,6 +27,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
@@ -60,7 +60,6 @@ import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -152,15 +151,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc) throws Exception {
- String name = stateDesc.getName();
-
- @SuppressWarnings("unchecked")
- StateTable<K, N, ArrayList<T>> stateTable = (StateTable<K, N, ArrayList<T>>) stateTables.get(name);
- RegisteredBackendStateMetaInfo<N, ArrayList<T>> newMetaInfo =
- new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer()));
-
- stateTable = tryRegisterStateTable(stateTable, newMetaInfo);
+ StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@@ -449,6 +441,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
}
+ // The serializer used in the list states now changes from ArrayListSerializer to ListSerializer.
+ if (stateSerializer instanceof ArrayListSerializer) {
+ stateSerializer = new ListSerializer<>(((ArrayListSerializer<?>) stateSerializer).getElementSerializer());
+ }
+
Map nullNameSpaceFix = (Map) rawResultMap.remove(null);
if (null != nullNameSpaceFix) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index f0eb53e..a4e8ea7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.Preconditions;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
/**
@@ -40,7 +41,7 @@ import java.util.Map;
* @param <V> The type of the value.
*/
public class HeapListState<K, N, V>
- extends AbstractHeapMergingState<K, N, V, Iterable<V>, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
+ extends AbstractHeapMergingState<K, N, V, Iterable<V>, List<V>, ListState<V>, ListStateDescriptor<V>>
implements InternalListState<N, V> {
/**
@@ -54,7 +55,7 @@ public class HeapListState<K, N, V>
public HeapListState(
KeyedStateBackend<K> backend,
ListStateDescriptor<V> stateDesc,
- StateTable<K, N, ArrayList<V>> stateTable,
+ StateTable<K, N, List<V>> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
@@ -69,14 +70,14 @@ public class HeapListState<K, N, V>
Preconditions.checkState(currentNamespace != null, "No namespace set.");
Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
- Map<N, Map<K, ArrayList<V>>> namespaceMap =
+ Map<N, Map<K, List<V>>> namespaceMap =
stateTable.get(backend.getCurrentKeyGroupIndex());
if (namespaceMap == null) {
return null;
}
- Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
+ Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
if (keyedMap == null) {
return null;
@@ -95,7 +96,7 @@ public class HeapListState<K, N, V>
return;
}
- Map<N, Map<K, ArrayList<V>>> namespaceMap =
+ Map<N, Map<K, List<V>>> namespaceMap =
stateTable.get(backend.getCurrentKeyGroupIndex());
if (namespaceMap == null) {
@@ -103,14 +104,14 @@ public class HeapListState<K, N, V>
stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
}
- Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
+ Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
if (keyedMap == null) {
keyedMap = createNewMap();
namespaceMap.put(currentNamespace, keyedMap);
}
- ArrayList<V> list = keyedMap.get(backend.<K>getCurrentKey());
+ List<V> list = keyedMap.get(backend.<K>getCurrentKey());
if (list == null) {
list = new ArrayList<>();
@@ -124,26 +125,26 @@ public class HeapListState<K, N, V>
Preconditions.checkState(namespace != null, "No namespace given.");
Preconditions.checkState(key != null, "No key given.");
- Map<N, Map<K, ArrayList<V>>> namespaceMap =
+ Map<N, Map<K, List<V>>> namespaceMap =
stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
if (namespaceMap == null) {
return null;
}
- Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
+ Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
if (keyedMap == null) {
return null;
}
- ArrayList<V> result = keyedMap.get(key);
+ List<V> result = keyedMap.get(key);
if (result == null) {
return null;
}
- TypeSerializer<V> serializer = stateDesc.getSerializer();
+ TypeSerializer<V> serializer = stateDesc.getElementSerializer();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);
@@ -165,7 +166,7 @@ public class HeapListState<K, N, V>
// ------------------------------------------------------------------------
@Override
- protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) {
+ protected List<V> mergeState(List<V> a, List<V> b) {
a.addAll(b);
return a;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index b4bf664..7737ecf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -494,7 +494,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
- TypeSerializer<String> valueSerializer = kvId.getSerializer();
+ TypeSerializer<String> valueSerializer = kvId.getElementSerializer();
ListState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index 33d60a0..f1b071a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.ExecutionConfig;
-
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -29,11 +28,10 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalListState;
-
import org.junit.Test;
-import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import static java.util.Arrays.asList;
@@ -96,7 +94,7 @@ public class HeapListStateTest {
// make sure all lists / maps are cleared
- StateTable<String, VoidNamespace, ArrayList<Long>> stateTable =
+ StateTable<String, VoidNamespace, List<Long>> stateTable =
((HeapListState<String, VoidNamespace, Long>) state).stateTable;
assertTrue(stateTable.isEmpty());
@@ -216,7 +214,7 @@ public class HeapListStateTest {
state.setCurrentNamespace(namespace1);
state.clear();
- StateTable<String, Integer, ArrayList<Long>> stateTable =
+ StateTable<String, Integer, List<Long>> stateTable =
((HeapListState<String, Integer, Long>) state).stateTable;
assertTrue(stateTable.isEmpty());
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 90e4b52..7adaf13 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -31,12 +31,12 @@ import java.util.ArrayList;
@Internal
@Deprecated
-public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
+public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, ArrayList<IN>, WindowFunction<IN, OUT, KEY, TimeWindow>> {
private static final long serialVersionUID = 7305948082830843475L;
-
+
public AccumulatingProcessingTimeWindowOperator(
WindowFunction<IN, OUT, KEY, TimeWindow> function,
KeySelector<IN, KEY> keySelector,
@@ -53,7 +53,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
@SuppressWarnings("unchecked")
WindowFunction<IN, OUT, KEY, Window> windowFunction = (WindowFunction<IN, OUT, KEY, Window>) function;
-
+
return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
index c0ca6a0..26cb7ac 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -23,8 +23,10 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -130,7 +132,7 @@ public class StateDescriptorPassingTest {
Iterable<File> input, Collector<String> out) {}
});
- validateStateDescriptorConfigured(result);
+ validateListStateDescriptorConfigured(result);
}
@Test
@@ -190,7 +192,7 @@ public class StateDescriptorPassingTest {
public void apply(TimeWindow window, Iterable<File> input, Collector<String> out) {}
});
- validateStateDescriptorConfigured(result);
+ validateListStateDescriptorConfigured(result);
}
// ------------------------------------------------------------------------
@@ -211,4 +213,26 @@ public class StateDescriptorPassingTest {
assertTrue("serializer registration was not properly passed on",
kryo.getSerializer(File.class) instanceof JavaSerializer);
}
+
+ private void validateListStateDescriptorConfigured(SingleOutputStreamOperator<?> result) {
+ OneInputTransformation<?, ?> transform = (OneInputTransformation<?, ?>) result.getTransformation();
+ WindowOperator<?, ?, ?, ?, ?> op = (WindowOperator<?, ?, ?, ?, ?>) transform.getOperator();
+ StateDescriptor<?, ?> descr = op.getStateDescriptor();
+
+ assertTrue(descr instanceof ListStateDescriptor);
+
+ ListStateDescriptor<?> listDescr = (ListStateDescriptor<?>)descr;
+
+ // this would be the first statement to fail if state descriptors were not properly initialized
+ TypeSerializer<?> serializer = listDescr.getSerializer();
+ assertTrue(serializer instanceof ListSerializer);
+
+ TypeSerializer<?> elementSerializer = listDescr.getElementSerializer();
+ assertTrue(elementSerializer instanceof KryoSerializer);
+
+ Kryo kryo = ((KryoSerializer<?>) elementSerializer).getKryo();
+
+ assertTrue("serializer registration was not properly passed on",
+ kryo.getSerializer(File.class) instanceof JavaSerializer);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d47446ca/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index 2791726..294b8da 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.Path;
@@ -162,12 +163,15 @@ public class StreamingRuntimeContextTest {
ListStateDescriptor<TaskInfo> descr = new ListStateDescriptor<>("name", TaskInfo.class);
context.getListState(descr);
- StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, ?>) descriptorCapture.get();
+ ListStateDescriptor<?> descrIntercepted = (ListStateDescriptor<?>) descriptorCapture.get();
TypeSerializer<?> serializer = descrIntercepted.getSerializer();
// check that the Path class is really registered, i.e., the execution config was applied
- assertTrue(serializer instanceof KryoSerializer);
- assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0);
+ assertTrue(serializer instanceof ListSerializer);
+
+ TypeSerializer<?> elementSerializer = descrIntercepted.getElementSerializer();
+ assertTrue(elementSerializer instanceof KryoSerializer);
+ assertTrue(((KryoSerializer<?>) elementSerializer).getKryo().getRegistration(Path.class).getId() > 0);
}
@Test
[2/2] flink git commit: [FLINK-5790] [core] Followups and tests for
the StateDescriptor changes
Posted by se...@apache.org.
[FLINK-5790] [core] Followups and tests for the StateDescriptor changes
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2045cc5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2045cc5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2045cc5f
Branch: refs/heads/master
Commit: 2045cc5f84ab39f18f423154c5620a79ac6d44ba
Parents: d47446c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Feb 14 15:32:03 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 15 13:10:30 2017 +0100
----------------------------------------------------------------------
.../api/common/state/ListStateDescriptor.java | 45 +++++++----
.../common/typeutils/base/ListSerializer.java | 50 ++++++++++--
.../flink/api/java/typeutils/ListTypeInfo.java | 45 ++++++++---
.../common/typeutils/SerializerTestBase.java | 7 +-
.../typeutils/base/ListSerializerTest.java | 83 ++++++++++++++++++++
.../api/common/state/ListStateDescriptor.java | 10 +--
.../state/heap/HeapKeyedStateBackend.java | 40 ++++++----
.../flink/runtime/state/heap/HeapListState.java | 29 ++++---
.../runtime/state/heap/HeapListStateTest.java | 7 +-
9 files changed, 237 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index 2047e24..c03f8cb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -27,8 +27,14 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo;
import java.util.List;
/**
- * A {@link StateDescriptor} for {@link ListState}. This can be used to create a partitioned
- * list state using
+ * A {@link StateDescriptor} for {@link ListState}. This can be used to create state where the type
+ * is a list that can be appended and iterated over.
+ *
+ * <p>Using {@code ListState} is typically more efficient than manually maintaining a list in a
+ * {@link ValueState}, because the backing implementation can support efficient appends, rathern then
+ * replacing the full list on write.
+ *
+ * <p>To create keyed list state (on a KeyedStream), use
* {@link org.apache.flink.api.common.functions.RuntimeContext#getListState(ListStateDescriptor)}.
*
* @param <T> The type of the values that can be added to the list state.
@@ -46,7 +52,6 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
* @param name The (unique) name for the state.
* @param elementTypeClass The type of the elements in the state.
*/
- @SuppressWarnings("unchecked")
public ListStateDescriptor(String name, Class<T> elementTypeClass) {
super(name, new ListTypeInfo<>(elementTypeClass), null);
}
@@ -57,7 +62,6 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
* @param name The (unique) name for the state.
* @param elementTypeInfo The type of the elements in the state.
*/
- @SuppressWarnings("unchecked")
public ListStateDescriptor(String name, TypeInformation<T> elementTypeInfo) {
super(name, new ListTypeInfo<>(elementTypeInfo), null);
}
@@ -68,26 +72,39 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
* @param name The (unique) name for the state.
* @param typeSerializer The type serializer for the list values.
*/
- @SuppressWarnings("unchecked")
public ListStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
super(name, new ListSerializer<>(typeSerializer), null);
}
+ // ------------------------------------------------------------------------
+
+ @Override
+ public ListState<T> bind(StateBackend stateBackend) throws Exception {
+ return stateBackend.createListState(this);
+ }
+
+ /**
+ * Gets the serializer for the elements contained in the list.
+ *
+ * @return The serializer for the elements in the list.
+ */
public TypeSerializer<T> getElementSerializer() {
- if (!(serializer instanceof ListSerializer)) {
+ // call getSerializer() here to get the initialization check and proper error message
+ final TypeSerializer<List<T>> rawSerializer = getSerializer();
+ if (!(rawSerializer instanceof ListSerializer)) {
throw new IllegalStateException();
}
return ((ListSerializer<T>)serializer).getElementSerializer();
}
- // ------------------------------------------------------------------------
-
@Override
- public ListState<T> bind(StateBackend stateBackend) throws Exception {
- return stateBackend.createListState(this);
+ public Type getType() {
+ return Type.LIST;
}
+ // ------------------------------------------------------------------------
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -97,8 +114,7 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
return false;
}
- ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
-
+ final ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
return serializer.equals(that.serializer) && name.equals(that.name);
}
@@ -116,9 +132,4 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
"serializer=" + serializer +
'}';
}
-
- @Override
- public Type getType() {
- return Type.LIST;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index a875a3b..ca3c143 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -26,21 +26,49 @@ import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
-@SuppressWarnings("ForLoopReplaceableByForEach")
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A serializer for {@link List Lists}. The serializer relies on an element serializer
+ * for teh serialization of the list's elements.
+ *
+ * <p>The serialization format for the list is as follows: four bytes for the length of the lost,
+ * followed by the serialized representation of each element.
+ *
+ * @param <T> The type of element in the list.
+ */
public class ListSerializer<T> extends TypeSerializer<List<T>> {
private static final long serialVersionUID = 1119562170939152304L;
+ /** The serializer for the elements of the list */
private final TypeSerializer<T> elementSerializer;
+ /**
+ * Creates a list serializer that uses the given serializer to serialize the list's elements.
+ *
+ * @param elementSerializer The serializer for the elements of the list
+ */
public ListSerializer(TypeSerializer<T> elementSerializer) {
- this.elementSerializer = elementSerializer;
+ this.elementSerializer = checkNotNull(elementSerializer);
}
+ // ------------------------------------------------------------------------
+ // ListSerializer specific properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the serializer for the elements of the list.
+ * @return The serializer for the elements of the list
+ */
public TypeSerializer<T> getElementSerializer() {
return elementSerializer;
}
+ // ------------------------------------------------------------------------
+ // Type Serializer implementation
+ // ------------------------------------------------------------------------
+
@Override
public boolean isImmutableType() {
return false;
@@ -54,14 +82,18 @@ public class ListSerializer<T> extends TypeSerializer<List<T>> {
@Override
public List<T> createInstance() {
- return new ArrayList<>();
+ return new ArrayList<>(0);
}
@Override
public List<T> copy(List<T> from) {
List<T> newList = new ArrayList<>(from.size());
- for (int i = 0; i < from.size(); i++) {
- newList.add(elementSerializer.copy(from.get(i)));
+
+ // We iterate here rather than accessing by index, because we cannot be sure that
+ // the given list supports RandomAccess.
+ // The Iterator should be stack allocated on new JVMs (due to escape analysis)
+ for (T element : from) {
+ newList.add(elementSerializer.copy(element));
}
return newList;
}
@@ -80,8 +112,12 @@ public class ListSerializer<T> extends TypeSerializer<List<T>> {
public void serialize(List<T> list, DataOutputView target) throws IOException {
final int size = list.size();
target.writeInt(size);
- for (int i = 0; i < size; i++) {
- elementSerializer.serialize(list.get(i), target);
+
+ // We iterate here rather than accessing by index, because we cannot be sure that
+ // the given list supports RandomAccess.
+ // The Iterator should be stack allocated on new JVMs (due to escape analysis)
+ for (T element : list) {
+ elementSerializer.serialize(element, target);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
index e70aaf8..763be98 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
@@ -18,7 +18,7 @@
package org.apache.flink.api.java.typeutils;
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -26,30 +26,44 @@ import org.apache.flink.api.common.typeutils.base.ListSerializer;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A {@link TypeInformation} for the list types of the Java API.
*
* @param <T> The type of the elements in the list.
*/
-
-
-@Public
+@PublicEvolving
public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
+ private static final long serialVersionUID = 1L;
+
private final TypeInformation<T> elementTypeInfo;
+
public ListTypeInfo(Class<T> elementTypeClass) {
- this.elementTypeInfo = TypeExtractor.createTypeInfo(elementTypeClass);
+ this.elementTypeInfo = of(checkNotNull(elementTypeClass, "elementTypeClass"));
}
public ListTypeInfo(TypeInformation<T> elementTypeInfo) {
- this.elementTypeInfo = elementTypeInfo;
+ this.elementTypeInfo = checkNotNull(elementTypeInfo, "elementTypeInfo");
}
+ // ------------------------------------------------------------------------
+ // ListTypeInfo specific properties
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the type information for the elements contained in the list
+ */
public TypeInformation<T> getElementTypeInfo() {
return elementTypeInfo;
}
+ // ------------------------------------------------------------------------
+ // TypeInformation implementation
+ // ------------------------------------------------------------------------
+
@Override
public boolean isBasicType() {
return false;
@@ -67,7 +81,9 @@ public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
@Override
public int getTotalFields() {
- return elementTypeInfo.getTotalFields();
+ // similar as arrays, the lists are "opaque" to the direct field addressing logic
+ // since the list's elements are not addressable, we do not expose them
+ return 1;
}
@SuppressWarnings("unchecked")
@@ -87,17 +103,20 @@ public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
return new ListSerializer<>(elementTypeSerializer);
}
+ // ------------------------------------------------------------------------
+
@Override
public String toString() {
- return null;
+ return "List<" + elementTypeInfo + '>';
}
@Override
public boolean equals(Object obj) {
- if (obj instanceof ListTypeInfo) {
- @SuppressWarnings("unchecked")
- ListTypeInfo<T> other = (ListTypeInfo<T>) obj;
-
+ if (obj == this) {
+ return true;
+ }
+ else if (obj instanceof ListTypeInfo) {
+ final ListTypeInfo<?> other = (ListTypeInfo<?>) obj;
return other.canEqual(this) && elementTypeInfo.equals(other.elementTypeInfo);
} else {
return false;
@@ -111,6 +130,6 @@ public final class ListTypeInfo<T> extends TypeInformation<List<T>> {
@Override
public boolean canEqual(Object obj) {
- return (obj instanceof ListTypeInfo);
+ return obj != null && obj.getClass() == getClass();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index ea91e56..91c6145 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -73,8 +73,11 @@ public abstract class SerializerTestBase<T> extends TestLogger {
Class<T> type = getTypeClass();
assertNotNull("The test is corrupt: type class is null.", type);
-
- assertEquals("Type of the instantiated object is wrong.", type, instance.getClass());
+
+ if (!type.isAssignableFrom(instance.getClass())) {
+ fail("Type of the instantiated object is wrong. " +
+ "Expected Type: " + type + " present type " + instance.getClass());
+ }
}
catch (Exception e) {
System.err.println(e.getMessage());
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
new file mode 100644
index 0000000..28cdc13
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A test for the {@link LongSerializer}.
+ */
+public class ListSerializerTest extends SerializerTestBase<List<Long>> {
+
+ @Override
+ protected TypeSerializer<List<Long>> createSerializer() {
+ return new ListSerializer<>(LongSerializer.INSTANCE);
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected Class<List<Long>> getTypeClass() {
+ return (Class<List<Long>>) (Class<?>) List.class;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ protected List<Long>[] getTestData() {
+ final Random rnd = new Random(123654789);
+
+ // empty lists
+ final List<Long> list1 = Collections.emptyList();
+ final List<Long> list2 = new LinkedList<>();
+ final List<Long> list3 = new ArrayList<>();
+
+ // single element lists
+ final List<Long> list4 = Collections.singletonList(55L);
+ final List<Long> list5 = new LinkedList<>();
+ list5.add(12345L);
+ final List<Long> list6 = new ArrayList<>();
+ list6.add(777888L);
+
+ // longer lists
+ final List<Long> list7 = new LinkedList<>();
+ for (int i = 0; i < rnd.nextInt(200); i++) {
+ list7.add(rnd.nextLong());
+ }
+
+ final List<Long> list8 = new ArrayList<>();
+ for (int i = 0; i < rnd.nextInt(200); i++) {
+ list8.add(rnd.nextLong());
+ }
+
+ return (List<Long>[]) new List[] {
+ list1, list2, list3, list4, list5, list6, list7, list8
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
index 4e83cca..28bc812 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
@@ -18,7 +18,7 @@
package org.apache.flink.migration.api.common.state;
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.StateBackend;
import org.apache.flink.api.common.state.StateDescriptor;
@@ -26,11 +26,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
/**
- * A {@link StateDescriptor} for {@link ListState}.
- *
- * @param <T> The type of the values that can be added to the list state.
+ * The old version of the {@link org.apache.flink.api.common.state.ListStateDescriptor}, retained for
+ * serialization backwards compatibility.
*/
-@PublicEvolving
+@Internal
+@Deprecated
public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 2366342..e386e0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
@@ -108,25 +107,29 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// state backend operations
// ------------------------------------------------------------------------
- @SuppressWarnings("unchecked")
private <N, V> StateTable<K, N, V> tryRegisterStateTable(
TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) {
- String name = stateDesc.getName();
- StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(name);
-
- RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
- new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, stateDesc.getSerializer());
-
- return tryRegisterStateTable(stateTable, newMetaInfo);
+ return tryRegisterStateTable(
+ stateDesc.getName(), stateDesc.getType(),
+ namespaceSerializer, stateDesc.getSerializer());
}
private <N, V> StateTable<K, N, V> tryRegisterStateTable(
- StateTable<K, N, V> stateTable, RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
+ String stateName,
+ StateDescriptor.Type stateType,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<V> valueSerializer) {
+
+ final RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer);
+
+ @SuppressWarnings("unchecked")
+ StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName);
if (stateTable == null) {
stateTable = new StateTable<>(newMetaInfo, keyGroupRange);
- stateTables.put(newMetaInfo.getName(), stateTable);
+ stateTables.put(stateName, stateTable);
} else {
if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
throw new RuntimeException("Trying to access state using incompatible meta info, was " +
@@ -151,8 +154,16 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc) throws Exception {
+ // the list state does some manual mapping, because the state is typed to the generic
+ // 'List' interface, but we want to use an implementation typed to ArrayList
+ // using a more specialized implementation opens up runtime optimizations
+
+ StateTable<K, N, ArrayList<T>> stateTable = tryRegisterStateTable(
+ stateDesc.getName(),
+ stateDesc.getType(),
+ namespaceSerializer,
+ new ArrayListSerializer<T>(stateDesc.getElementSerializer()));
- StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@@ -441,11 +452,6 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
}
- // The serializer used in the list states now changes from ArrayListSerializer to ListSerializer.
- if (stateSerializer instanceof ArrayListSerializer) {
- stateSerializer = new ListSerializer<>(((ArrayListSerializer<?>) stateSerializer).getElementSerializer());
- }
-
Map nullNameSpaceFix = (Map) rawResultMap.remove(null);
if (null != nullNameSpaceFix) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index a4e8ea7..02c3067 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -29,19 +29,18 @@ import org.apache.flink.util.Preconditions;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
/**
* Heap-backed partitioned {@link org.apache.flink.api.common.state.ListState} that is snapshotted
* into files.
- *
+ *
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <V> The type of the value.
*/
public class HeapListState<K, N, V>
- extends AbstractHeapMergingState<K, N, V, Iterable<V>, List<V>, ListState<V>, ListStateDescriptor<V>>
+ extends AbstractHeapMergingState<K, N, V, Iterable<V>, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
implements InternalListState<N, V> {
/**
@@ -55,7 +54,7 @@ public class HeapListState<K, N, V>
public HeapListState(
KeyedStateBackend<K> backend,
ListStateDescriptor<V> stateDesc,
- StateTable<K, N, List<V>> stateTable,
+ StateTable<K, N, ArrayList<V>> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer) {
super(backend, stateDesc, stateTable, keySerializer, namespaceSerializer);
@@ -70,14 +69,14 @@ public class HeapListState<K, N, V>
Preconditions.checkState(currentNamespace != null, "No namespace set.");
Preconditions.checkState(backend.getCurrentKey() != null, "No key set.");
- Map<N, Map<K, List<V>>> namespaceMap =
+ Map<N, Map<K, ArrayList<V>>> namespaceMap =
stateTable.get(backend.getCurrentKeyGroupIndex());
if (namespaceMap == null) {
return null;
}
- Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
+ Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
if (keyedMap == null) {
return null;
@@ -96,7 +95,7 @@ public class HeapListState<K, N, V>
return;
}
- Map<N, Map<K, List<V>>> namespaceMap =
+ Map<N, Map<K, ArrayList<V>>> namespaceMap =
stateTable.get(backend.getCurrentKeyGroupIndex());
if (namespaceMap == null) {
@@ -104,14 +103,14 @@ public class HeapListState<K, N, V>
stateTable.set(backend.getCurrentKeyGroupIndex(), namespaceMap);
}
- Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
+ Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
if (keyedMap == null) {
keyedMap = createNewMap();
namespaceMap.put(currentNamespace, keyedMap);
}
- List<V> list = keyedMap.get(backend.<K>getCurrentKey());
+ ArrayList<V> list = keyedMap.get(backend.<K>getCurrentKey());
if (list == null) {
list = new ArrayList<>();
@@ -119,26 +118,26 @@ public class HeapListState<K, N, V>
}
list.add(value);
}
-
+
@Override
public byte[] getSerializedValue(K key, N namespace) throws Exception {
Preconditions.checkState(namespace != null, "No namespace given.");
Preconditions.checkState(key != null, "No key given.");
- Map<N, Map<K, List<V>>> namespaceMap =
+ Map<N, Map<K, ArrayList<V>>> namespaceMap =
stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
if (namespaceMap == null) {
return null;
}
- Map<K, List<V>> keyedMap = namespaceMap.get(currentNamespace);
+ Map<K, ArrayList<V>> keyedMap = namespaceMap.get(currentNamespace);
if (keyedMap == null) {
return null;
}
- List<V> result = keyedMap.get(key);
+ ArrayList<V> result = keyedMap.get(key);
if (result == null) {
return null;
@@ -166,8 +165,8 @@ public class HeapListState<K, N, V>
// ------------------------------------------------------------------------
@Override
- protected List<V> mergeState(List<V> a, List<V> b) {
+ protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) {
a.addAll(b);
return a;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2045cc5f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index f1b071a..746db28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -28,10 +28,11 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalListState;
+
import org.junit.Test;
+import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import static java.util.Arrays.asList;
@@ -94,7 +95,7 @@ public class HeapListStateTest {
// make sure all lists / maps are cleared
- StateTable<String, VoidNamespace, List<Long>> stateTable =
+ StateTable<String, VoidNamespace, ArrayList<Long>> stateTable =
((HeapListState<String, VoidNamespace, Long>) state).stateTable;
assertTrue(stateTable.isEmpty());
@@ -214,7 +215,7 @@ public class HeapListStateTest {
state.setCurrentNamespace(namespace1);
state.clear();
- StateTable<String, Integer, List<Long>> stateTable =
+ StateTable<String, Integer, ArrayList<Long>> stateTable =
((HeapListState<String, Integer, Long>) state).stateTable;
assertTrue(stateTable.isEmpty());