You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/06/25 19:21:44 UTC

[11/12] flink git commit: [streaming] [docs] Updated streaming guide for new state interfaces

[streaming] [docs] Updated streaming guide for new state interfaces


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a4144e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a4144e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a4144e6

Branch: refs/heads/master
Commit: 0a4144e6662ece6b4c0ebec37b49d150aa37e4ce
Parents: 56ae08e
Author: Gyula Fora <gy...@apache.org>
Authored: Sat Jun 20 13:35:33 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Thu Jun 25 16:38:07 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    | 56 ++++++++++----------
 .../flink/api/common/state/OperatorState.java   | 12 ++---
 .../runtime/tasks/StreamingRuntimeContext.java  |  2 +-
 3 files changed, 35 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a4144e6/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index e9fc264..997a245 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1188,38 +1188,42 @@ Rich functions provide, in addition to the user-defined function (`map()`, `redu
 Stateful computation
 ------------
 
-Flink supports the checkpointing and persistence of user defined state, so in case of a failure this state can be restored to the latest checkpoint and the processing can continue from there. This gives exactly once semantics for anything that is stored in the state when the sources are stateful as well and checkpoint their current offset. The `PersistentKafkaSource` provides this stateful functionality for example. 
+Flink supports the checkpointing and persistence of user defined operator state, so in case of a failure this state can be restored to the latest checkpoint and the processing will continue from there. This gives exactly once processing semantics with respect to the operator states when the sources follow this stateful pattern as well. In practice this usually means that sources keep track of their current offset as their OperatorState. The `PersistentKafkaSource` provides this stateful functionality for reading streams from Kafka. 
 
-For example when implementing a rolling count over the stream Flink gives you the possibility to safely store the counter. Another common usecase is when reading from a Kafka source to save the latest committed offset to catch up from. To mark a function for checkpointing it has to implement the `flink.streaming.api.checkpoint.Checkpointed` interface or preferably its special case where the checkpointing can be done asynchronously, `CheckpointedAsynchronously`. 
+Flink supports two ways of accessing operator states: partitioned and non-partitioned state access.
+In case of non-partitioned state access, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.getState()` is called a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `getState()` will return number of inputs processed by each parallel mapper.
 
-Checkpointing can be enabled from the `StreamExecutionEnvironment` using the `enableCheckpointing(…)` where additional parameters can be passed to modify the default 5 second checkpoint interval.
+In case of partitioned state access the user needs to define a `KeyExtractor` which will assign a key to each input of the stateful operator:
+
+`stream.map(counter).setStatePartitioner(…)`
+
+A separate `OperatorState` is maintained for each received key which can be used for instance to count received inputs by different keys, or store and update summary statistics of different sub-streams.
+
+Checkpointing of the states needs to be enabled from the `StreamExecutionEnvironment` using the `enableCheckpointing(…)` where additional parameters can be passed to modify the default 5 second checkpoint interval.
+
+Operators can be accessed from the `RuntimeContext` using the `getOperatorState(“name”, defaultValue)` method so it is only accessible in `RichFunction`s. A recommended usage pattern is to retrieve the operator state in the `open(…)` method of the operator and set it as a field in the operator instance for runtime usage. Multiple `OperatorState`s can be used simultaneously by the same operator by using different names to identify them.
+
+By default operator states are checkpointed using default java serialization thus they need to be `Serializable`. The user can gain more control over the state checkpoint mechanism by passing a `StateCheckpointer` instance when retrieving the `OperatorState` from the `RuntimeContext`. The `StateCheckpointer` allows custom implementations for the checkpointing logic for increased efficiency and to store arbitrary non-serializable states.
 
 By default state checkpoints will be stored in-memory at the JobManager. Flink also supports storing the checkpoints on any flink-supported file system (such as HDFS or Tachyon) which can be set in the flink-conf.yaml. Note that the state backend must be accessible from the JobManager, use `file://` only for local setups.
 
 For example let us write a reduce function that besides summing the data it also counts have many elements it has seen.
 
 {% highlight java %}
-public class CounterSum implements ReduceFunction<Long>, CheckpointedAsynchronously<Long> {
+public class CounterSum implements RichReduceFunction<Long> {
     
     //persistent counter
-    private long counter = 0;
+    private OperatorState<Long> counter;
 
     @Override
     public Long reduce(Long value1, Long value2) throws Exception {
-        counter++;
+        counter.updateState(counter.getState() + 1);
         return value1 + value2;
     }
 
-    // regularly persists state during normal operation
-    @Override
-    public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-        return new Long(counter);
-    }
-
-    // restores state on recovery from failure
     @Override
-    public void restoreState(Serializable state) {
-        counter = (Long) state;
+    public void open(Configuration config) {
+        counter = getRuntimeContext().getOperatorState(“counter”, 0L);
     }
 }
 {% endhighlight %} 
@@ -1227,12 +1231,13 @@ public class CounterSum implements ReduceFunction<Long>, CheckpointedAsynchronou
 Stateful sources require a bit more care as opposed to other operators they are not data driven, but their `run(SourceContext)` methods potentially run infinitely. In order to make the updates to the state and output collection atomic the user is required to get a lock from the source's context.
 
 {% highlight java %}
-public static class CounterSource implements SourceFunction<Long>, CheckpointedAsynchronously<Long> {
+public static class CounterSource implements RichParallelSourceFunction<Long> {
 
     // utility for job cancellation
     private volatile boolean isRunning = false;
     
-    private long counter;
+    // maintain the current offset for exactly once semantics
+    private OperatorState<Long> offset;
     
     @Override
     public void run(SourceContext<Long> ctx) throws Exception {
@@ -1242,25 +1247,20 @@ public static class CounterSource implements SourceFunction<Long>, CheckpointedA
         while (isRunning) {
             // output and state update are atomic
             synchronized (lock){
-                ctx.collect(counter);
-                counter++;
+                ctx.collect(offset);
+                offset.updateState(offset.getState() + 1);
             }
         }
     }
 
     @Override
-    public void cancel() {
-        isRunning = false;
+    public void open(Configuration config) {
+        offset = getRuntimeContext().getOperatorState(“offset”, 0L);
     }
 
     @Override
-    public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-        return new Long(counter);
-    }
-
-    @Override
-    public void restoreState(Serializable state) {
-        counter = (Long) state;
+    public void cancel() {
+        isRunning = false;
     }
 }
 {% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a4144e6/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
index 9155037..926c190 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -26,9 +26,9 @@ import org.apache.flink.api.common.functions.MapFunction;
  * non-partitioned user states.
  * 
  * State can be accessed and manipulated using the {@link #getState()} and
- * {@link #updateState(T)} methods. These calls are only valid in the
+ * {@link #updateState(T)} methods. These calls are only safe in the
  * transformation call the operator represents, for instance inside
- * {@link MapFunction#map()} and invalid in
+ * {@link MapFunction#map()} and can lead tp unexpected behavior in the
  * {@link #open(org.apache.flink.configuration.Configuration)} or
  * {@link #close()} methods.
  * 
@@ -39,10 +39,10 @@ public interface OperatorState<T> {
 
 	/**
 	 * Gets the current state for the operator. When the state is not
-	 * partitioned the returned state is the same for all inputs. If state
-	 * partitioning is applied the state returned depends on the current
-	 * operator input, as the operator maintains an independent state for each
-	 * partition.
+	 * partitioned the returned state is the same for all inputs in a given
+	 * operator instance. If state partitioning is applied, the state returned
+	 * depends on the current operator input, as the operator maintains an
+	 * independent state for each partition.
 	 * 
 	 * @return The operator state corresponding to the current input.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0a4144e6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 15e5f1a..14ea5ea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -114,7 +114,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	}
 
 	/**
-	 * Creates an empty state depending on the partitioning state.
+	 * Creates an empty {@link OperatorState}.
 	 * 
 	 * @return An empty operator state.
 	 */