You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/21 11:09:15 UTC

[5/6] flink git commit: [FLINK-5821] [state backends] Rename the 'StateBackend' to 'StateBinder' and create root StateBackend interface

[FLINK-5821] [state backends] Rename the 'StateBackend' to 'StateBinder' and create root StateBackend interface

StateBinder more correctly reflects what the interface does and clears up the name 'StateBackend'

The 'StateBackend' interface is now the root of the State Backend hierarchy (previously that was 'AbstractStateBackend')

This also extends a lot the JavaDocs of the core state classes, like StateBackend and StateObject


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f15603d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f15603d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f15603d8

Branch: refs/heads/master
Commit: f15603d81dad4861175093f4ad22eb2f8ccee4a0
Parents: a404796
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 13 14:29:37 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 21 01:16:26 2017 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |   3 +-
 .../state/AggregatingStateDescriptor.java       |   4 +-
 .../common/state/FoldingStateDescriptor.java    |   4 +-
 .../api/common/state/ListStateDescriptor.java   |   4 +-
 .../common/state/ReducingStateDescriptor.java   |   4 +-
 .../flink/api/common/state/StateBackend.java    |  73 ----------
 .../flink/api/common/state/StateBinder.java     |  73 ++++++++++
 .../flink/api/common/state/StateDescriptor.java |   8 +-
 .../api/common/state/ValueStateDescriptor.java  |   4 +-
 .../api/common/state/ListStateDescriptor.java   |   4 +-
 .../state/AbstractKeyedStateBackend.java        |   4 +-
 .../runtime/state/AbstractStateBackend.java     |  31 ++--
 .../flink/runtime/state/StateBackend.java       | 145 +++++++++++++++++++
 .../apache/flink/runtime/state/StateObject.java |  41 ++++--
 14 files changed, 276 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 06cceda..6b09a8a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -19,7 +19,6 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.StateBackend;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
@@ -50,7 +49,7 @@ import java.util.UUID;
 import static java.util.Objects.requireNonNull;
 
 /**
- * A {@link StateBackend} that stores its state in {@code RocksDB}. This state backend can
+ * A State Backend that stores its state in {@code RocksDB}. This state backend can
  * store very large state that exceeds memory and spills to disk.
  *
  * <p>All key/value state (including windows) is stored in the key/value index of RocksDB.

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
index abdac91..b7378d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
@@ -96,8 +96,8 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<Ag
 	// ------------------------------------------------------------------------
 
 	@Override
-	public AggregatingState<IN, OUT> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createAggregatingState(this);
+	public AggregatingState<IN, OUT> bind(StateBinder stateBinder) throws Exception {
+		return stateBinder.createAggregatingState(this);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index 143945e..73bfaa8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -97,8 +97,8 @@ public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState
 	// ------------------------------------------------------------------------
 	
 	@Override
-	public FoldingState<T, ACC> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createFoldingState(this);
+	public FoldingState<T, ACC> bind(StateBinder stateBinder) throws Exception {
+		return stateBinder.createFoldingState(this);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index c03f8cb..ea28ad2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -79,8 +79,8 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 	// ------------------------------------------------------------------------
 
 	@Override
-	public ListState<T> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createListState(this);
+	public ListState<T> bind(StateBinder stateBinder) throws Exception {
+		return stateBinder.createListState(this);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index a1d4225..3edf1ca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -86,8 +86,8 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
 	// ------------------------------------------------------------------------
 	
 	@Override
-	public ReducingState<T> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createReducingState(this);
+	public ReducingState<T> bind(StateBinder stateBinder) throws Exception {
+		return stateBinder.createReducingState(this);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
deleted file mode 100644
index f9d1af7..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * The {@code StateBackend} is used by {@link StateDescriptor} instances to create actual state
- * representations.
- */
-@PublicEvolving
-public interface StateBackend {
-
-	/**
-	 * Creates and returns a new {@link ValueState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> The type of the value that the {@code ValueState} can store.
-	 */
-	<T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link ListState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> The type of the values that the {@code ListState} can store.
-	 */
-	<T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link ReducingState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> The type of the values that the {@code ReducingState} can store.
-	 */
-	<T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link AggregatingState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <IN> The type of the values that go into the aggregating state
-	 * @param <ACC> The type of the values that are stored in the aggregating state   
-	 * @param <OUT> The type of the values that come out of the aggregating state   
-	 */
-	<IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(
-			AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link FoldingState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> Type of the values folded into the state
-	 * @param <ACC> Type of the value in the state
-	 */
-	<T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
new file mode 100644
index 0000000..08dfc90
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The {@code StateBinder} is used by {@link StateDescriptor} instances to create actual
+ * {@link State} objects.
+ */
+@Internal
+public interface StateBinder {
+
+	/**
+	 * Creates and returns a new {@link ValueState}.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <T> The type of the value that the {@code ValueState} can store.
+	 */
+	<T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception;
+
+	/**
+	 * Creates and returns a new {@link ListState}.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <T> The type of the values that the {@code ListState} can store.
+	 */
+	<T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception;
+
+	/**
+	 * Creates and returns a new {@link ReducingState}.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <T> The type of the values that the {@code ReducingState} can store.
+	 */
+	<T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception;
+
+	/**
+	 * Creates and returns a new {@link AggregatingState}.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <IN> The type of the values that go into the aggregating state
+	 * @param <ACC> The type of the values that are stored in the aggregating state   
+	 * @param <OUT> The type of the values that come out of the aggregating state   
+	 */
+	<IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(
+			AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception;
+
+	/**
+	 * Creates and returns a new {@link FoldingState}.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <T> Type of the values folded into the state
+	 * @param <ACC> Type of the value in the state
+	 */
+	<T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index bc909e6..332e649 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -39,7 +39,7 @@ import static java.util.Objects.requireNonNull;
 /**
  * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned
  * {@link State} in stateful operations. This contains the name and can create an actual state
- * object given a {@link StateBackend} using {@link #bind(StateBackend)}.
+ * object given a {@link StateBinder} using {@link #bind(StateBinder)}.
  *
  * <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}.
  *
@@ -208,11 +208,11 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	}
 
 	/**
-	 * Creates a new {@link State} on the given {@link StateBackend}.
+	 * Creates a new {@link State} on the given {@link StateBinder}.
 	 *
-	 * @param stateBackend The {@code StateBackend} on which to create the {@link State}.
+	 * @param stateBinder The {@code StateBackend} on which to create the {@link State}.
 	 */
-	public abstract S bind(StateBackend stateBackend) throws Exception;
+	public abstract S bind(StateBinder stateBinder) throws Exception;
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index b3006c4..3afc8a7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -124,8 +124,8 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
 	// ------------------------------------------------------------------------
 	
 	@Override
-	public ValueState<T> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createValueState(this);
+	public ValueState<T> bind(StateBinder stateBinder) throws Exception {
+		return stateBinder.createValueState(this);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
index 28bc812..3b1af54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
@@ -20,7 +20,7 @@ package org.apache.flink.migration.api.common.state;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.StateBackend;
+import org.apache.flink.api.common.state.StateBinder;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -70,7 +70,7 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public ListState<T> bind(StateBackend stateBackend) throws Exception {
+	public ListState<T> bind(StateBinder stateBinder) throws Exception {
 		throw new IllegalStateException("Cannot bind states with a legacy state descriptor.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index c8e0d0d..fe5d1cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateBackend;
+import org.apache.flink.api.common.state.StateBinder;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
@@ -264,7 +264,7 @@ public abstract class AbstractKeyedStateBackend<K>
 		}
 
 		// create a new blank key/value state
-		S state = stateDescriptor.bind(new StateBackend() {
+		S state = stateDescriptor.bind(new StateBinder() {
 			@Override
 			public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
 				return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 60d035a..bc4594a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
@@ -26,27 +27,18 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import java.io.IOException;
 
 /**
- * A state backend defines how state is stored and snapshotted during checkpoints.
+ * An abstract base implementation of the {@link StateBackend} interface.
  */
-public abstract class AbstractStateBackend implements java.io.Serializable {
+@PublicEvolving
+public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {
 	private static final long serialVersionUID = 4620415814639230247L;
 
-	/**
-	 * Creates a {@link CheckpointStreamFactory} that can be used to create streams
-	 * that should end up in a checkpoint.
-	 *
-	 * @param jobId              The {@link JobID} of the job for which we are creating checkpoint streams.
-	 * @param operatorIdentifier An identifier of the operator for which we create streams.
-	 */
+	@Override
 	public abstract CheckpointStreamFactory createStreamFactory(
 			JobID jobId,
-			String operatorIdentifier
-	) throws IOException;
+			String operatorIdentifier) throws IOException;
 
-	/**
-	 * Creates a new {@link AbstractKeyedStateBackend} that is responsible for keeping keyed state
-	 * and can be checkpointed to checkpoint streams.
-	 */
+	@Override
 	public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env,
 			JobID jobID,
@@ -54,16 +46,13 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry
-	) throws Exception;
+			TaskKvStateRegistry kvStateRegistry) throws Exception;
 
-	/**
-	 * Creates a new {@link OperatorStateBackend} that can be used for storing partitionable operator
-	 * state in checkpoint streams.
-	 */
+	@Override
 	public OperatorStateBackend createOperatorStateBackend(
 			Environment env,
 			String operatorIdentifier) throws Exception {
+
 		return new DefaultOperatorStateBackend(env.getUserClassLoader());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
new file mode 100644
index 0000000..846df89
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+
+import java.io.IOException;
+
+/**
+ * A <b>State Backend</b> defines how the state of a streaming application is stored and
+ * checkpointed. Different State Backends store their state in different fashions, and use
+ * different data structures to hold the state of a running application.
+ *
+ * <p>For example, the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend memory state backend}
+ * keeps working state in the memory of the TaskManager and stores checkpoints in the memory of the
+ * JobManager. The backend is lightweight and without additional dependencies, but not highly available
+ * and supports only small state.
+ *
+ * <p>The {@link org.apache.flink.runtime.state.filesystem.FsStateBackend file system state backend}
+ * keeps working state in the memory of the TaskManager and stores state checkpoints in a filesystem
+ * (typically a replicated highly-available filesystem, like <a href="https://hadoop.apache.org/">HDFS</a>,
+ * <a href="https://ceph.com/">Ceph</a>, <a href="https://aws.amazon.com/documentation/s3/">S3</a>,
+ * <a href="https://cloud.google.com/storage/">GCS</a>, etc).
+ * 
+ * <p>The {@code RocksDBStateBackend} stores working state in <a href="http://rocksdb.org/">RocksDB</a>,
+ * and checkpoints the state by default to a filesystem (similar to the {@code FsStateBackend}).
+ * 
+ * <h2>Raw Bytes Storage and Backends</h2>
+ * 
+ * The {@code StateBackend} creates services for <i>raw bytes storage</i> and for <i>keyed state</i>
+ * and <i>operator state</i>.
+ * 
+ * <p>The <i>raw bytes storage</i> (through the {@link CheckpointStreamFactory}) is the fundamental
+ * service that simply stores bytes in a fault tolerant fashion. This service is used by the JobManager
+ * to store checkpoint and recovery metadata and is typically also used by the keyed- and operator state
+ * backends to store checkpointed state.
+ *
+ * <p>The {@link AbstractKeyedStateBackend} and {@link OperatorStateBackend} created by this state
+ * backend define how to hold the working state for keys and operators. They also define how to checkpoint
+ * that state, frequently using the raw bytes storage (via the {@code CheckpointStreamFactory}).
+ * However, it is also possible that for example a keyed state backend simply implements the bridge to
+ * a key/value store, and that it does not need to store anything in the raw byte storage upon a
+ * checkpoint.
+ * 
+ * <h2>Serializability</h2>
+ * 
+ * State Backends need to be {@link java.io.Serializable serializable}, because they distributed
+ * across parallel processes (for distributed execution) together with the streaming application code. 
+ * 
+ * <p>Because of that, {@code StateBackend} implementations (typically subclasses
+ * of {@link AbstractStateBackend}) are meant to be like <i>factories</i> that create the proper
+ * states stores that provide access to the persistent storage and hold the keyed- and operator
+ * state data structures. That way, the State Backend can be very lightweight (contain only
+ * configurations) which makes it easier to be serializable.
+ * 
+ * 
+ * <h2>Thread Safety</h2>
+ * 
+ * State backend implementations have to be thread-safe. Multiple threads may be creating
+ * streams and keyed-/operator state backends concurrently.
+ */
+@PublicEvolving
+public interface StateBackend extends java.io.Serializable {
+
+	// ------------------------------------------------------------------------
+	//  Persistent Bytes Storage
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a {@link CheckpointStreamFactory} that can be used to create streams
+	 * that should end up in a checkpoint.
+	 *
+	 * @param jobId              The {@link JobID} of the job for which we are creating checkpoint streams.
+	 * @param operatorIdentifier An identifier of the operator for which we create streams.
+	 */
+	CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException;
+
+	// ------------------------------------------------------------------------
+	//  Structure Backends 
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
+	 * and checkpointing it.
+	 * 
+	 * <p><i>Keyed State</i> is state where each value is bound to a key.
+	 * 
+	 * @param env
+	 * @param jobID
+	 * @param operatorIdentifier
+	 * @param keySerializer
+	 * @param numberOfKeyGroups
+	 * @param keyGroupRange
+	 * @param kvStateRegistry
+	 * 
+	 * @param <K> The type of the keys by which the state is organized.
+	 *     
+	 * @return The Keyed State Backend for the given job, operator, and key group range.
+	 * 
+	 * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
+	 */
+	<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+			Environment env,
+			JobID jobID,
+			String operatorIdentifier,
+			TypeSerializer<K> keySerializer,
+			int numberOfKeyGroups,
+			KeyGroupRange keyGroupRange,
+			TaskKvStateRegistry kvStateRegistry) throws Exception;
+
+	/**
+	 * Creates a new {@link OperatorStateBackend} that can be used for storing operator state.
+	 * 
+	 * <p>Operator state is state that is associated with parallel operator (or function) instances,
+	 * rather than with keys.
+	 * 
+	 * @param env The runtime environment of the executing task.
+	 * @param operatorIdentifier The identifier of the operator whose state should be stored.
+	 * 
+	 * @return The OperatorStateBackend for operator identified by the job and operator identifier.
+	 * 
+	 * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
+	 */
+	OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
index 9ff2fa8..7f1dd18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
@@ -19,28 +19,45 @@
 package org.apache.flink.runtime.state;
 
 /**
- * Base of all types that represent checkpointed state. Specializations are for
- * example {@link StateHandle StateHandles} (directly resolve to state).
+ * Base of all handles that represent checkpointed state in some form. The object may hold
+ * the (small) state directly, or contain a file path (state is in the file), or contain the
+ * metadata to access the state stored in some external database.
  *
- * <p>State objects define how to:
- * <ul>
- *     <li><b>Discard State</b>: The {@link #discardState()} method defines how state is permanently
- *         disposed/deleted. After that method call, state may not be recoverable any more.</li>
- * </ul>
+ * <p>State objects define how to {@link #discardState() discard state} and how to access the
+ * {@link #getStateSize() size of the state}.
+ * 
+ * <p>State Objects are transported via RPC between <i>JobManager</i> and
+ * <i>TaskManager</i> and must be {@link java.io.Serializable serializable} to support that.
+ * 
+ * <p>Some State Objects are stored in the checkpoint/savepoint metadata. For long-term
+ * compatibility, they are not stored via {@link java.io.Serializable Java Serialization},
+ * but through custom serializers.
  */
 public interface StateObject extends java.io.Serializable {
 
 	/**
 	 * Discards the state referred to by this handle, to free up resources in
-	 * the persistent storage. This method is called when the handle will not be
-	 * used any more.
+	 * the persistent storage. This method is called when the state represented by this
+	 * object will not be used any more.
 	 */
 	void discardState() throws Exception;
 
 	/**
-	 * Returns the size of the state in bytes.
-	 *
-	 * <p>If the the size is not known, return {@code 0}.
+	 * Returns the size of the state in bytes. If the the size is not known, this
+	 * method should return {@code 0}.
+	 * 
+	 * <p>The values produced by this method are only used for informational purposes and
+	 * for metrics/monitoring. If this method returns wrong values, the checkpoints and recovery
+	 * will still behave correctly. However, efficiency may be impacted (wrong space pre-allocation)
+	 * and functionality that depends on metrics (like monitoring) will be impacted.
+	 * 
+	 * <p>Note for implementors: This method should not perform any I/O operations
+	 * while obtaining the state size (hence it does not declare throwing an {@code IOException}).
+	 * Instead, the state size should be stored in the state object, or should be computable from
+	 * the state stored in this object.
+	 * The reason is that this method is called frequently by several parts of the checkpointing
+	 * and issuing I/O requests from this method accumulates a heavy I/O load on the storage
+	 * system at higher scale.
 	 *
 	 * @return Size of the state in bytes.
 	 */