You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/05 20:29:00 UTC

[2/2] flink git commit: [FLINK-4744] [streaming api] Followup: Unify names for operator state access methods and comments.

[FLINK-4744] [streaming api] Followup: Unify names for operator state access methods and comments.

Also make JavaSerializer package private, as it is not intended for user as a proper TypeSerializer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10a42f95
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10a42f95
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10a42f95

Branch: refs/heads/master
Commit: 10a42f951c5143537c28a0f9df65627e5c632c4b
Parents: 56cba7e
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Oct 5 15:30:26 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Oct 5 19:36:13 2016 +0200

----------------------------------------------------------------------
 .../api/common/state/OperatorStateStore.java    |  61 ++++++++++
 .../java/typeutils/runtime/JavaSerializer.java  | 119 ------------------
 .../state/DefaultOperatorStateBackend.java      |   8 +-
 .../flink/runtime/state/JavaSerializer.java     | 122 +++++++++++++++++++
 .../runtime/state/OperatorStateBackend.java     |   2 +
 .../flink/runtime/state/OperatorStateStore.java |  60 ---------
 .../runtime/state/OperatorStateBackendTest.java |  15 ++-
 .../kafka/FlinkKafkaConsumerBase.java           |   6 +-
 .../kafka/FlinkKafkaProducerBase.java           |   2 +-
 .../kafka/AtLeastOnceProducerTest.java          |   1 -
 .../kafka/FlinkKafkaConsumerBaseTest.java       |  14 +--
 .../api/checkpoint/CheckpointedFunction.java    |   4 +-
 .../operators/AbstractUdfStreamOperator.java    |   6 +-
 .../operators/StreamCheckpointedOperator.java   |   5 +-
 .../streaming/runtime/tasks/StreamTask.java     |   2 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |   4 +-
 16 files changed, 217 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
new file mode 100644
index 0000000..03c11f6
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Interface for a backend that manages operator state.
+ */
+public interface OperatorStateStore {
+
+	String DEFAULT_OPERATOR_STATE_NAME = "_default_";
+
+	/**
+	 * Creates a state descriptor of the given name that uses Java serialization to persist the
+	 * state.
+	 * 
+	 * <p>This is a simple convenience method. For more flexibility on how state serialization
+	 * should happen, use the {@link #getOperatorState(ListStateDescriptor)} method.
+	 *
+	 * @param stateName The name of state to create
+	 * @return A list state using Java serialization to serialize state objects.
+	 * @throws Exception
+	 */
+	ListState<Serializable> getSerializableListState(String stateName) throws Exception;
+
+	/**
+	 * Creates (or restores) a list state. Each state is registered under a unique name.
+	 * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore).
+	 *
+	 * @param stateDescriptor The descriptor for this state, providing a name and serializer.
+	 * @param <S> The generic type of the state
+	 * 
+	 * @return A list for all state partitions.
+	 * @throws Exception
+	 */
+	<S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception;
+
+	/**
+	 * Returns a set with the names of all currently registered states.
+	 * @return set of names for all registered states.
+	 */
+	Set<String> getRegisteredStateNames();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
deleted file mode 100644
index 3af7653..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-public class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final ClassLoader userClassLoader;
-
-	public JavaSerializer() {
-		this(Thread.currentThread().getContextClassLoader());
-	}
-
-	public JavaSerializer(ClassLoader userClassLoader) {
-		this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
-	}
-
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public TypeSerializer<T> duplicate() {
-		return this;
-	}
-
-	@Override
-	public T createInstance() {
-		return null;
-	}
-
-	@Override
-	public T copy(T from) {
-
-		try {
-			return InstantiationUtil.clone(from);
-		} catch (IOException | ClassNotFoundException e) {
-			throw new RuntimeException("Could not copy instance of " + from + '.', e);
-		}
-	}
-
-	@Override
-	public T copy(T from, T reuse) {
-		return copy(from);
-	}
-
-	@Override
-	public int getLength() {
-		return 0;
-	}
-
-	@Override
-	public void serialize(T record, DataOutputView target) throws IOException {
-		InstantiationUtil.serializeObject(new DataOutputViewStream(target), record);
-	}
-
-	@Override
-	public T deserialize(DataInputView source) throws IOException {
-		try {
-			return InstantiationUtil.deserializeObject(new DataInputViewStream(source), userClassLoader);
-		} catch (ClassNotFoundException e) {
-			throw new IOException("Could not deserialize object.", e);
-		}
-	}
-
-	@Override
-	public T deserialize(T reuse, DataInputView source) throws IOException {
-		return deserialize(source);
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		int size = source.readInt();
-		target.writeInt(size);
-		target.write(source, size);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		return obj instanceof JavaSerializer && userClassLoader.equals(((JavaSerializer<T>) obj).userClassLoader);
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof JavaSerializer;
-	}
-
-	@Override
-	public int hashCode() {
-		return getClass().hashCode();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index af97a3f..b1ab7e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.JavaSerializer;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.memory.DataInputView;
@@ -74,15 +74,15 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 	}
 
 	@Override
-	public ListState<Serializable> getDefaultPartitionableState(String stateName) throws Exception {
-		return getPartitionableState(new ListStateDescriptor<>(stateName, javaSerializer));
+	public ListState<Serializable> getSerializableListState(String stateName) throws Exception {
+		return getOperatorState(new ListStateDescriptor<>(stateName, javaSerializer));
 	}
 
 	/**
 	 * @see OperatorStateStore
 	 */
 	@Override
-	public <S> ListState<S> getPartitionableState(
+	public <S> ListState<S> getOperatorState(
 			ListStateDescriptor<S> stateDescriptor) throws IOException {
 
 		Preconditions.checkNotNull(stateDescriptor);

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
new file mode 100644
index 0000000..2eb9595
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+@Internal
+final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
+
+	private final ClassLoader userClassLoader;
+
+	public JavaSerializer() {
+		this(Thread.currentThread().getContextClassLoader());
+	}
+
+	public JavaSerializer(ClassLoader userClassLoader) {
+		this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<T> duplicate() {
+		return this;
+	}
+
+	@Override
+	public T createInstance() {
+		return null;
+	}
+
+	@Override
+	public T copy(T from) {
+
+		try {
+			return InstantiationUtil.clone(from);
+		} catch (IOException | ClassNotFoundException e) {
+			throw new RuntimeException("Could not copy instance of " + from + '.', e);
+		}
+	}
+
+	@Override
+	public T copy(T from, T reuse) {
+		return copy(from);
+	}
+
+	@Override
+	public int getLength() {
+		return 0;
+	}
+
+	@Override
+	public void serialize(T record, DataOutputView target) throws IOException {
+		InstantiationUtil.serializeObject(new DataOutputViewStream(target), record);
+	}
+
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		try {
+			return InstantiationUtil.deserializeObject(new DataInputViewStream(source), userClassLoader);
+		} catch (ClassNotFoundException e) {
+			throw new IOException("Could not deserialize object.", e);
+		}
+	}
+
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		int size = source.readInt();
+		target.writeInt(size);
+		target.write(source, size);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj instanceof JavaSerializer && userClassLoader.equals(((JavaSerializer<T>) obj).userClassLoader);
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof JavaSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return getClass().hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
index 4e980b7..83e6369 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateBackend.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.api.common.state.OperatorStateStore;
+
 import java.io.Closeable;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
deleted file mode 100644
index ceab87f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateStore.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.java.typeutils.runtime.JavaSerializer;
-
-import java.io.Serializable;
-import java.util.Set;
-
-/**
- * Interface for a backend that manages partitionable operator state.
- */
-public interface OperatorStateStore {
-
-	String DEFAULT_OPERATOR_STATE_NAME = "";
-
-	/**
-	 * Creates a satte descriptor of the given name that uses {@link JavaSerializer}.
-	 *
-	 * @param stateName The name of state to create
-	 * @return A state descriptor that uses {@link JavaSerializer}
-	 * @throws Exception
-	 */
-	ListState<Serializable> getDefaultPartitionableState(String stateName) throws Exception;
-
-	/**
-	 * Creates (or restores) the partitionable state in this backend. Each state is registered under a unique name.
-	 * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore).
-	 *
-	 * @param stateDescriptor The descriptr for this state, providing a name and serializer
-	 * @param <S> The generic type of the state
-	 * @return A list for all state partitions.
-	 * @throws Exception
-	 */
-	<S> ListState<S> getPartitionableState(ListStateDescriptor<S> stateDescriptor) throws Exception;
-
-	/**
-	 * Returns a set with the names of all currently registered states.
-	 * @return set of names for all registered states.
-	 */
-	Set<String> getRegisteredStateNames();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index ff1a23d..2db8735 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.java.typeutils.runtime.JavaSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.junit.Test;
@@ -61,7 +60,7 @@ public class OperatorStateBackendTest {
 		OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
 		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
 		ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
-		ListState<Serializable> listState1 = operatorStateBackend.getPartitionableState(stateDescriptor1);
+		ListState<Serializable> listState1 = operatorStateBackend.getOperatorState(stateDescriptor1);
 		assertNotNull(listState1);
 		assertEquals(1, operatorStateBackend.getRegisteredStateNames().size());
 		Iterator<Serializable> it = listState1.get().iterator();
@@ -74,7 +73,7 @@ public class OperatorStateBackendTest {
 		assertEquals(4711, it.next());
 		assertTrue(!it.hasNext());
 
-		ListState<Serializable> listState2 = operatorStateBackend.getPartitionableState(stateDescriptor2);
+		ListState<Serializable> listState2 = operatorStateBackend.getOperatorState(stateDescriptor2);
 		assertNotNull(listState2);
 		assertEquals(2, operatorStateBackend.getRegisteredStateNames().size());
 		assertTrue(!it.hasNext());
@@ -88,7 +87,7 @@ public class OperatorStateBackendTest {
 		assertEquals(23, it.next());
 		assertTrue(!it.hasNext());
 
-		ListState<Serializable> listState1b = operatorStateBackend.getPartitionableState(stateDescriptor1);
+		ListState<Serializable> listState1b = operatorStateBackend.getOperatorState(stateDescriptor1);
 		assertNotNull(listState1b);
 		listState1b.add(123);
 		it = listState1b.get().iterator();
@@ -115,8 +114,8 @@ public class OperatorStateBackendTest {
 		OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend();
 		ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
 		ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
-		ListState<Serializable> listState1 = operatorStateBackend.getPartitionableState(stateDescriptor1);
-		ListState<Serializable> listState2 = operatorStateBackend.getPartitionableState(stateDescriptor2);
+		ListState<Serializable> listState1 = operatorStateBackend.getOperatorState(stateDescriptor1);
+		ListState<Serializable> listState2 = operatorStateBackend.getOperatorState(stateDescriptor2);
 
 		listState1.add(42);
 		listState1.add(4711);
@@ -137,8 +136,8 @@ public class OperatorStateBackendTest {
 
 			assertEquals(0, operatorStateBackend.getRegisteredStateNames().size());
 
-			listState1 = operatorStateBackend.getPartitionableState(stateDescriptor1);
-			listState2 = operatorStateBackend.getPartitionableState(stateDescriptor2);
+			listState1 = operatorStateBackend.getOperatorState(stateDescriptor1);
+			listState2 = operatorStateBackend.getOperatorState(stateDescriptor2);
 
 			assertEquals(2, operatorStateBackend.getRegisteredStateNames().size());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index a30341b..8d63345 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
@@ -313,7 +313,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		this.stateStore = stateStore;
 
 		ListState<Serializable> offsets =
-				stateStore.getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+				stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
 
 		restoreToOffset = new HashMap<>();
 
@@ -333,7 +333,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		} else {
 
 			ListState<Serializable> listState =
-					stateStore.getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+					stateStore.getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
 			listState.clear();
 
 			final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 8b87004..f0975dc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
index 766a107..d2d7fca 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.OperatorStateStore;
 import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 45b45f0..373d6ab 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -22,7 +22,7 @@ import org.apache.commons.collections.map.LinkedMap;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -98,7 +98,7 @@ public class FlinkKafkaConsumerBaseTest {
 		FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, new LinkedMap(), false);
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
-		when(operatorStateStore.getPartitionableState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+		when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
 
 		consumer.prepareSnapshot(17L, 17L);
 
@@ -121,10 +121,10 @@ public class FlinkKafkaConsumerBaseTest {
 
 		FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true);
 
-		when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(expectedState);
+		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(expectedState);
 		consumer.initializeState(operatorStateStore);
 
-		when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(listState);
+		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
 
 		consumer.prepareSnapshot(17L, 17L);
 
@@ -153,7 +153,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		TestingListState<Serializable> listState = new TestingListState<>();
-		when(operatorStateStore.getDefaultPartitionableState(Matchers.any(String.class))).thenReturn(listState);
+		when(operatorStateStore.getSerializableListState(Matchers.any(String.class))).thenReturn(listState);
 
 		consumer.initializeState(operatorStateStore);
 		consumer.prepareSnapshot(17L, 17L);
@@ -190,7 +190,7 @@ public class FlinkKafkaConsumerBaseTest {
 		TestingListState<Serializable> listState2 = new TestingListState<>();
 		TestingListState<Serializable> listState3 = new TestingListState<>();
 
-		when(backend.getDefaultPartitionableState(Matchers.any(String.class))).
+		when(backend.getSerializableListState(Matchers.any(String.class))).
 				thenReturn(listState1, listState1, listState2, listState3);
 
 		consumer.initializeState(backend);
@@ -252,7 +252,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 		OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
 		TestingListState<Tuple2<KafkaTopicPartition, Long>> listState = new TestingListState<>();
-		when(operatorStateStore.getPartitionableState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
+		when(operatorStateStore.getOperatorState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
 
 		// create 500 snapshots
 		for (int i = 100; i < 600; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
index 2227201..777cb91 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.api.checkpoint;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
 
 /**
  *
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.OperatorStateStore;
  * repartitionable state that needs to be checkpointed. Methods from this interface are called upon checkpointing and
  * restoring of state.
  *
- * On #initializeState the implementing class receives the {@link org.apache.flink.runtime.state.OperatorStateStore}
+ * On #initializeState the implementing class receives the {@link OperatorStateStore}
  * to store it's state. At least before each snapshot, all state persistent state must be stored in the state store.
  *
  * When the backend is received for initialization, the user registers states with the backend via

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 428442d..72f30b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -30,7 +30,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.OperatorStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -108,7 +108,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 			ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction;
 
 			ListState<Serializable> listState = getOperatorStateBackend().
-					getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+					getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
 
 			List<Serializable> list = new ArrayList<>();
 
@@ -202,7 +202,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 					((ListCheckpointed<Serializable>) userFunction).snapshotState(checkpointId, timestamp);
 
 			ListState<Serializable> listState = getOperatorStateBackend().
-					getDefaultPartitionableState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
+					getSerializableListState(OperatorStateStore.DEFAULT_OPERATOR_STATE_NAME);
 
 			listState.clear();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
index 50cdc02..d2f7e0d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCheckpointedOperator.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 @Deprecated
@@ -45,8 +44,8 @@ public interface StreamCheckpointedOperator {
 	 * This method restores the operator state (if the operator is stateful) and the key/value state
 	 * (if it had been used and was initialized when the snapshot occurred).
 	 *
-	 * <p>This method is called after {@link #setup(StreamTask, StreamConfig, Output)}
-	 * and before {@link #open()}.
+	 * <p>This method is called after {@link StreamOperator#setup(StreamTask, StreamConfig, Output)}
+	 * and before {@link StreamOperator#open()}.
 	 *
 	 * @param in The stream from which we have to restore our state.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 88c3ba4..9802a16 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -85,7 +85,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
  *
  * The life cycle of the task is set up as follows:
  * <pre>{@code
- *  -- getPartitionableState() -> restores state of all operators in the chain
+ *  -- getOperatorState() -> restores state of all operators in the chain
  *
  *  -- invoke()
  *        |

http://git-wip-us.apache.org/repos/asf/flink/blob/10a42f95/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 31ccc28..f6e7dca 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -559,7 +559,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 		public void open() throws Exception {
 			super.open();
 
-			ListState<Integer> partitionableState = getOperatorStateBackend().getPartitionableState(TEST_DESCRIPTOR);
+			ListState<Integer> partitionableState = getOperatorStateBackend().getOperatorState(TEST_DESCRIPTOR);
 
 			if (numberSnapshotCalls == 0) {
 				for (Integer v : partitionableState.get()) {
@@ -582,7 +582,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 				long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
 
 			ListState<Integer> partitionableState =
-					getOperatorStateBackend().getPartitionableState(TEST_DESCRIPTOR);
+					getOperatorStateBackend().getOperatorState(TEST_DESCRIPTOR);
 			partitionableState.clear();
 
 			partitionableState.add(42);