You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/05 20:29:00 UTC
[2/2] flink git commit: [FLINK-4744] [streaming api] Followup: Unify
names for operator state access methods and comments.
[FLINK-4744] [streaming api] Followup: Unify names for operator state access methods and comments.
Also make JavaSerializer package private, as it is not intended for user as a proper TypeSerializer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10a42f95
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10a42f95
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10a42f95
Branch: refs/heads/master
Commit: 10a42f951c5143537c28a0f9df65627e5c632c4b
Parents: 56cba7e
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Oct 5 15:30:26 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Oct 5 19:36:13 2016 +0200
----------------------------------------------------------------------
.../api/common/state/OperatorStateStore.java | 61 ++++++++++
.../java/typeutils/runtime/JavaSerializer.java | 119 ------------------
.../state/DefaultOperatorStateBackend.java | 8 +-
.../flink/runtime/state/JavaSerializer.java | 122 +++++++++++++++++++
.../runtime/state/OperatorStateBackend.java | 2 +
.../flink/runtime/state/OperatorStateStore.java | 60 ---------
.../runtime/state/OperatorStateBackendTest.java | 15 ++-
.../kafka/FlinkKafkaConsumerBase.java | 6 +-
.../kafka/FlinkKafkaProducerBase.java | 2 +-
.../kafka/AtLeastOnceProducerTest.java | 1 -
.../kafka/FlinkKafkaConsumerBaseTest.java | 14 +--
.../api/checkpoint/CheckpointedFunction.java | 4 +-
.../operators/AbstractUdfStreamOperator.java | 6 +-
.../operators/StreamCheckpointedOperator.java | 5 +-
.../streaming/runtime/tasks/StreamTask.java | 2 +-
.../runtime/tasks/OneInputStreamTaskTest.java | 4 +-
16 files changed, 217 insertions(+), 214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
new file mode 100644
index 0000000..03c11f6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Interface for a backend that manages operator state.
+ */
+public interface OperatorStateStore {
+
+ String DEFAULT_OPERATOR_STATE_NAME = "_default_";
+
+ /**
+ * Creates a state descriptor of the given name that uses Java serialization to persist the
+ * state.
+ *
+ * <p>This is a simple convenience method. For more flexibility on how state serialization
+ * should happen, use the {@link #getOperatorState(ListStateDescriptor)} method.
+ *
+ * @param stateName The name of state to create
+ * @return A list state using Java serialization to serialize state objects.
+ * @throws Exception
+ */
+ ListState<Serializable> getSerializableListState(String stateName) throws Exception;
+
+ /**
+ * Creates (or restores) a list state. Each state is registered under a unique name.
+ * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore).
+ *
+ * @param stateDescriptor The descriptor for this state, providing a name and serializer.
+ * @param <S> The generic type of the state
+ *
+ * @return A list for all state partitions.
+ * @throws Exception
+ */
+ <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception;
+
+ /**
+ * Returns a set with the names of all currently registered states.
+ * @return set of names for all registered states.
+ */
+ Set<String> getRegisteredStateNames();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
deleted file mode 100644
index 3af7653..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-public class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
-
- private static final long serialVersionUID = 1L;
-
- private final ClassLoader userClassLoader;
-
- public JavaSerializer() {
- this(Thread.currentThread().getContextClassLoader());
- }
-
- public JavaSerializer(ClassLoader userClassLoader) {
- this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
- }
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public TypeSerializer<T> duplicate() {
- return this;
- }
-
- @Override
- public T createInstance() {
- return null;
- }
-
- @Override
- public T copy(T from) {
-
- try {
- return InstantiationUtil.clone(from);
- } catch (IOException | ClassNotFoundException e) {
- throw new RuntimeException("Could not copy instance of " + from + '.', e);
- }
- }
-
- @Override
- public T copy(T from, T reuse) {
- return copy(from);
- }
-
- @Override
- public int getLength() {
- return 0;
- }
-
- @Override
- public void serialize(T record, DataOutputView target) throws IOException {
- InstantiationUtil.serializeObject(new DataOutputViewStream(target), record);
- }
-
- @Override
- public T deserialize(DataInputView source) throws IOException {
- try {
- return InstantiationUtil.deserializeObject(new DataInputViewStream(source), userClassLoader);
- } catch (ClassNotFoundException e) {
- throw new IOException("Could not deserialize object.", e);
- }
- }
-
- @Override
- public T deserialize(T reuse, DataInputView source) throws IOException {
- return deserialize(source);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- int size = source.readInt();
- target.writeInt(size);
- target.write(source, size);
- }
-
- @Override
- public boolean equals(Object obj) {
- return obj instanceof JavaSerializer && userClassLoader.equals(((JavaSerializer<T>) obj).userClassLoader);
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof JavaSerializer;
- }
-
- @Override
- public int hashCode() {
- return getClass().hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index af97a3f..b1ab7e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.JavaSerializer;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.memory.DataInputView;
@@ -74,15 +74,15 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
}
@Override
- public ListState<Serializable> getDefaultPartitionableState(String stateName) throws Exception {
- return getPartitionableState(new ListStateDescriptor<>(stateName, javaSerializer));
+ public ListState<Serializable> getSerializableListState(String stateName) throws Exception {
+ return getOperatorState(new ListStateDescriptor<>(stateName, javaSerializer));
}
/**
* @see OperatorStateStore
*/
@Override
- public <S> ListState<S> getPartitionableState(
+ public <S> ListState<S> getOperatorState(
ListStateDescriptor<S> stateDescriptor) throws IOException {
Preconditions.checkNotNull(stateDescriptor);
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
new file mode 100644
index 0000000..2eb9595
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+@Internal
+final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
+
+ private final ClassLoader userClassLoader;
+
+ public JavaSerializer() {
+ this(Thread.currentThread().getContextClassLoader());
+ }
+
+ public JavaSerializer(ClassLoader userClassLoader) {
+ this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<T> duplicate() {
+ return this;
+ }
+
+ @Override
+ public T createInstance() {
+ return null;
+ }
+
+ @Override
+ public T copy(T from) {
+
+ try {
+ return InstantiationUtil.clone(from);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException("Could not copy instance of " + from + '.', e);
+ }
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return 0;
+ }
+
+ @Override
+ public void serialize(T record, DataOutputView target) throws IOException {
+ InstantiationUtil.serializeObject(new DataOutputViewStream(target), record);
+ }
+
+ @Override
+ public T deserialize(DataInputView source) throws IOException {
+ try {
+ return InstantiationUtil.deserializeObject(new DataInputViewStream(source), userClassLoader);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Could not deserialize object.", e);
+ }
+ }
+
+ @Override
+ public T deserialize(T reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ int size = source.readInt();
+ target.writeInt(size);
+ target.write(source, size);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof JavaSerializer && userClassLoader.equals(((JavaSerializer<T>) obj).userClassLoader);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof JavaSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
index 4e980b7..83e6369 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.api.common.state.OperatorStateStore;
+
import java.io.Closeable;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
deleted file mode 100644
index ceab87f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.java.typeutils.runtime.JavaSerializer;
-
-import java.io.Serializable;
-import java.util.Set;
-
-/**
- * Interface for a backend that manages partitionable operator state.
- */
-public interface OperatorStateStore {
-
- String DEFAULT_OPERATOR_STATE_NAME = "";
-
- /**
- * Creates a satte descriptor of the given name that uses {@link JavaSerializer}.
- *
- * @param stateName The name of state to create
- * @return A state descriptor that uses {@link JavaSerializer}
- * @throws Exception
- */
- ListState<Serializable> getDefaultPartitionableState(String stateName) throws Exception;
-
- /**
- * Creates (or restores) the partitionable state in this backend. Each state is registered under a unique name.
- * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore).
- *
- * @param stateDescriptor The descriptr for this state, providing a name and serializer
- * @param <S> The generic type of the state
- * @return A list for all state partitions.
- * @throws Exception
- */
- <S> ListState<S> getPartitionableState(ListStateDescriptor<S> stateDescriptor) throws Exception;
-
- /**
- * Returns a set with the names of all currently registered states.
- * @return set of names for all registered states.
- */
- Set<String> getRegisteredStateNames();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index ff1a23d..2db8735 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.java.typeutils.runtime.JavaSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.junit.Test;
@@ -61,7 +60,7 @@ public class OperatorStateBackendTest {
OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
- ListState<Serializable> listState1 = operatorStateBackend.getPartitionableState(stateDescriptor1);
+ ListState<Serializable> listState1 = operatorStateBackend.getOperatorState(stateDescriptor1);
assertNotNull(listState1);
assertEquals(1, operatorStateBackend.getRegisteredStateNames().size());
Iterator<Serializable> it = listState1.get().iterator();
@@ -74,7 +73,7 @@ public class OperatorStateBackendTest {
assertEquals(4711, it.next());
assertTrue(!it.hasNext());
- ListState<Serializable> listState2 = operatorStateBackend.getPartitionableState(stateDescriptor2);
+ ListState<Serializable> listState2 = operatorStateBackend.getOperatorState(stateDescriptor2);
assertNotNull(listState2);
assertEquals(2, operatorStateBackend.getRegisteredStateNames().size());
assertTrue(!it.hasNext());
@@ -88,7 +87,7 @@ public class OperatorStateBackendTest {
assertEquals(23, it.next());
assertTrue(!it.hasNext());
- ListState<Serializable> listState1b = operatorStateBackend.getPartitionableState(stateDescriptor1);
+ ListState<Serializable> listState1b = operatorStateBackend.getOperatorState(stateDescriptor1);
assertNotNull(listState1b);
listState1b.add(123);
it = listState1b.get().iterator();
@@ -115,8 +114,8 @@ public class OperatorStateBackendTest {
OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
- ListState<Serializable> listState1 = operatorStateBackend.getPartitionableState(stateDescriptor1);
- ListState<Serializable> listState2 = operatorStateBackend.getPartitionableState(stateDescriptor2);
+ ListState<Serializable> listState1 = operatorStateBackend.getOperatorState(stateDescriptor1);
+ ListState<Serializable> listState2 = operatorStateBackend.getOperatorState(stateDescriptor2);
listState1.add(42);
listState1.add(4711);
@@ -137,8 +136,8 @@ public class OperatorStateBackendTest {
assertEquals(0, operatorStateBackend.getRegisteredStateNames().size());
- listState1 = operatorStateBackend.getPartitionableState(stateDescriptor1);
- listState2 = operatorStateBackend.getPartitionableState(stateDescriptor2);
+ listState1 = operatorStateBackend.getOperatorState(stateDescriptor1);
+ listState2 = operatorStateBackend.getOperatorState(stateDescriptor2);
assertEquals(2, operatorStateBackend.getRegisteredStateNames().size());
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index a30341b..8d63345 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -313,7 +313,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
this.stateStore = stateStore;
ListState<Serializable> offsets =
- stateStore.getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+ stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
restoreToOffset = new HashMap<>();
@@ -333,7 +333,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
} else {
ListState<Serializable> listState =
- stateStore.getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+ stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
listState.clear();
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 8b87004..f0975dc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
index 766a107..d2d7fca 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.OperatorStateStore;
import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 45b45f0..373d6ab 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -22,7 +22,7 @@ import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -98,7 +98,7 @@ public class FlinkKafkaConsumerBaseTest {
FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, new LinkedMap(), false);
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
- when(operatorStateStore.getPartitionableState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+ when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
consumer.prepareSnapshot(17L, 17L);
@@ -121,10 +121,10 @@ public class FlinkKafkaConsumerBaseTest {
FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
- when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(expectedState);
+ when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(expectedState);
consumer.initializeState(operatorStateStore);
- when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(listState);
+ when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
consumer.prepareSnapshot(17L, 17L);
@@ -153,7 +153,7 @@ public class FlinkKafkaConsumerBaseTest {
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
TestingListState<Serializable> listState = new TestingListState<>();
- when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(listState);
+ when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
consumer.initializeState(operatorStateStore);
consumer.prepareSnapshot(17L, 17L);
@@ -190,7 +190,7 @@ public class FlinkKafkaConsumerBaseTest {
TestingListState<Serializable> listState2 = new TestingListState<>();
TestingListState<Serializable> listState3 = new TestingListState<>();
- when(backend.getDefaultPartitionableState(Matchers.any(String.class))).
+ when(backend.getSerializableListState(Matchers.any(String.class))).
thenReturn(listState1, listState1, listState2, listState3);
consumer.initializeState(backend);
@@ -252,7 +252,7 @@ public class FlinkKafkaConsumerBaseTest {
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
- when(operatorStateStore.getPartitionableState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+ when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
// create 500 snapshots
for (int i = 100; i < 600; i++) {
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
index 2227201..777cb91 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.api.checkpoint;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
/**
*
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.OperatorStateStore;
* repartitionable state that needs to be checkpointed. Methods from this interface are called upon checkpointing and
* restoring of state.
*
- * On #initializeState the implementing class receives the {@link org.apache.flink.runtime.state.OperatorStateStore}
+ * On #initializeState the implementing class receives the {@link OperatorStateStore}
* to store it's state. At least before each snapshot, all state persistent state must be stored in the state store.
*
* When the backend is received for initialization, the user registers states with the backend via
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 428442d..72f30b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -30,7 +30,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -108,7 +108,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;
ListState<Serializable> listState = getOperatorStateBackend().
- getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+ getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
List<Serializable> list = new ArrayList<>();
@@ -202,7 +202,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
((ListCheckpointed<Serializable>) userFunction).snapshotState(checkpointId, timestamp);
ListState<Serializable> listState = getOperatorStateBackend().
- getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+ getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
listState.clear();
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
index 50cdc02..d2f7e0d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
@Deprecated
@@ -45,8 +44,8 @@ public interface StreamCheckpointedOperator {
* This method restores the operator state (if the operator is stateful) and the key/value state
* (if it had been used and was initialized when the snapshot occurred).
*
- * <p>This method is called after {@link #setup(StreamTask, StreamConfig, Output)}
- * and before {@link #open()}.
+ * <p>This method is called after {@link StreamOperator#setup(StreamTask, StreamConfig, Output)}
+ * and before {@link StreamOperator#open()}.
*
* @param in The stream from which we have to restore our state.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 88c3ba4..9802a16 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -85,7 +85,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
*
* The life cycle of the task is set up as follows:
* <pre>{@code
- * -- getPartitionableState() -> restores state of all operators in the chain
+ * -- getOperatorState() -> restores state of all operators in the chain
*
* -- invoke()
* |
http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 31ccc28..f6e7dca 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -559,7 +559,7 @@ public class OneInputStreamTaskTest extends TestLogger {
public void open() throws Exception {
super.open();
- ListState<Integer> partitionableState = getOperatorStateBackend().getPartitionableState(TEST_DESCRIPTOR);
+ ListState<Integer> partitionableState = getOperatorStateBackend().getOperatorState(TEST_DESCRIPTOR);
if (numberSnapshotCalls == 0) {
for (Integer v : partitionableState.get()) {
@@ -582,7 +582,7 @@ public class OneInputStreamTaskTest extends TestLogger {
long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
ListState<Integer> partitionableState =
- getOperatorStateBackend().getPartitionableState(TEST_DESCRIPTOR);
+ getOperatorStateBackend().getOperatorState(TEST_DESCRIPTOR);
partitionableState.clear();
partitionableState.add(42);