You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/01/30 16:59:42 UTC

[1/2] flink git commit: [FLINK-5456] Resurrect and update parts of the state intro documentation

Repository: flink
Updated Branches:
  refs/heads/release-1.2 c365a34b8 -> 65b1da8c5


[FLINK-5456] Resurrect and update parts of the state intro documentation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0666786a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0666786a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0666786a

Branch: refs/heads/release-1.2
Commit: 0666786ad87d3befd248cf7b8c59e686ac29af8e
Parents: c365a34
Author: David Anderson <da...@alpinegizmo.com>
Authored: Wed Jan 18 15:56:18 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jan 30 17:59:14 2017 +0100

----------------------------------------------------------------------
 docs/dev/stream/state.md | 332 +++++++++++++++++++++++++++++++++++++++---
 1 file changed, 314 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0666786a/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index 0b38a62..124ce68 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -26,11 +26,11 @@ under the License.
 {:toc}
 
 Stateful functions and operators store data across the processing of individual elements/events, making state a critical building block for
-any type of more elaborate operation. For example: 
+any type of more elaborate operation. For example:
 
   - When an application searches for certain event patterns, the state will store the sequence of events encountered so far.
   - When aggregating events per minute, the state holds the pending aggregates.
-  - When training a machine learning model over a stream of data points, the state holds the current verstion of the model parameters.
+  - When training a machine learning model over a stream of data points, the state holds the current version of the model parameters.
 
 In order to make state fault tolerant, Flink needs to be aware of the state and [checkpoint](checkpointing.html) it.
 In many cases, Flink can also *manage* the state for the application, meaning Flink deals with the memory management (possibly spilling to disk
@@ -39,40 +39,336 @@ if necessary) to allow applications to hold very large state.
 This document explains how to use Flink's state abstractions when developing an application.
 
 
-## Keyed State and Operator state
+## Keyed State and Operator State
 
-There are two basic state backends: `Keyed State` and `Operator State`.
+There are two basic kinds of state in Flink: `Keyed State` and `Operator State`.
 
-#### Keyed State
+### Keyed State
 
 *Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`.
-Examples of keyed state are the `ValueState` or `ListState` that one can create in a function on a `KeyedStream`, as
-well as the state of a keyed window operator.
 
-Keyed State is organized in so called *Key Groups*. Key Groups are the unit by which keyed state can be redistributed and
-there are as many key groups as the defined maximum parallelism.
-During execution each parallel instance of an operator gets one or more key groups.
+You can think of Keyed State as Operator State that has been partitioned,
+or sharded, with exactly one state-partition per key.
+Each keyed-state is logically bound to a unique
+composite of <parallel-operator-instance, key>, and since each key
+"belongs" to exactly one parallel instance of a keyed operator, we can
+think of this simply as <operator, key>.
 
-#### Operator State
+Keyed State is further organized into so-called *Key Groups*. Key Groups are the
+atomic unit by which Flink can redistribute Keyed State;
+there are exactly as many Key Groups as the defined maximum parallelism.
+During execution each parallel instance of a keyed operator works with the keys
+for one or more Key Groups.
 
-*Operator State* is state per parallel subtask. It subsumes the `Checkpointed` interface in Flink 1.0 and Flink 1.1.
-The new `CheckpointedFunction` interface is basically a shortcut (syntactic sugar) for the Operator State.
+### Operator State
 
-Operator State needs special re-distribution schemes when parallelism is changed. There can be different variations of such
-schemes; the following are currently defined:
+With *Operator State* (or *non-keyed state*), each operator state is
+bound to one parallel operator instance.
+The Kafka source connector is a good motivating example for the use of Operator State
+in Flink. Each parallel instance of this Kafka consumer maintains a map
+of topic partitions and offsets as its Operator State.
+
+The Operator State interfaces support redistributing state among
+parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution; the following are currently defined:
 
   - **List-style redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of
     all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators.
     Each operator gets a sublist, which can be empty, or contain one or more elements.
 
-
 ## Raw and Managed State
 
 *Keyed State* and *Operator State* exist in two forms: *managed* and *raw*.
 
 *Managed State* is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB.
-Examples are "ValueState", "ListState", etc. Flink's runtime encodes the states and writes them into the checkpoints.
+Examples are "ValueState", "ListState", etc. Flink's runtime encodes
+the states and writes them into the checkpoints.
 
-*Raw State* is state that users and operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into
+*Raw State* is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into
 the checkpoint. Flink knows nothing about the state's data structures and sees only the raw bytes.
 
+All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators.
+Using managed state (rather than raw state) is recommended, since with
+managed state Flink is able to automatically redistribute state when the parallelism is
+changed, and also do better memory management.
+
+## Using Managed Keyed State
+
+The managed keyed state interface provides access to different types of state that are all scoped to
+the key of the current input element. This means that this type of state can only be used
+on a `KeyedStream`, which can be created via `stream.keyBy(\u2026)`.
+
+Now, we will first look at the different types of state available and then we will see
+how they can be used in a program. The available state primitives are:
+
+* `ValueState<T>`: This keeps a value that can be updated and
+retrieved (scoped to key of the input element as mentioned above, so there will possibly be one value
+for each key that the operation sees). The value can be set using `update(T)` and retrieved using
+`T value()`.
+
+* `ListState<T>`: This keeps a list of elements. You can append elements and retrieve an `Iterable`
+over all currently stored elements. Elements are added using `add(T)`, the Iterable can
+be retrieved using `Iterable<T> get()`.
+
+* `ReducingState<T>`: This keeps a single value that represents the aggregation of all values
+added to the state. The interface is the same as for `ListState` but elements added using
+`add(T)` are reduced to an aggregate using a specified `ReduceFunction`.
+
+All types of state also have a method `clear()` that clears the state for the currently
+active key, i.e. the key of the input element.
+
+It is important to keep in mind that these state objects are only used for interfacing
+with state. The state is not necessarily stored inside but might reside on disk or somewhere else.
+The second thing to keep in mind is that the value you get from the state
+depends on the key of the input element. So the value you get in one invocation of your
+user function can differ from the value in another invocation if the keys involved are different.
+
+To get a state handle, you have to create a `StateDescriptor`. This holds the name of the state
+(as we will see later, you can create several states, and they have to have unique names so
+that you can reference them), the type of the values that the state holds, and possibly
+a user-specified function, such as a `ReduceFunction`. Depending on what type of state you
+want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor` or
+a `ReducingStateDescriptor`.
+
+State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*.
+Please see [here]({{ site.baseurl }}/dev/api_concepts#rich-functions) for
+information about that, but we will also see an example shortly. The `RuntimeContext` that
+is available in a `RichFunction` has these methods for accessing state:
+
+* `ValueState<T> getState(ValueStateDescriptor<T>)`
+* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
+* `ListState<T> getListState(ListStateDescriptor<T>)`
+
+This is an example `FlatMapFunction` that shows how all of the parts fit together:
+
+{% highlight java %}
+public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+    /**
+     * The ValueState handle. The first field is the count, the second field a running sum.
+     */
+    private transient ValueState<Tuple2<Long, Long>> sum;
+
+    @Override
+    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
+
+        // access the state value
+        Tuple2<Long, Long> currentSum = sum.value();
+
+        // update the count
+        currentSum.f0 += 1;
+
+        // add the second field of the input value
+        currentSum.f1 += input.f1;
+
+        // update the state
+        sum.update(currentSum);
+
+        // if the count reaches 2, emit the average and clear the state
+        if (currentSum.f0 >= 2) {
+            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
+            sum.clear();
+        }
+    }
+
+    @Override
+    public void open(Configuration config) {
+        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+                new ValueStateDescriptor<>(
+                        "average", // the state name
+                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
+                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
+        sum = getRuntimeContext().getState(descriptor);
+    }
+}
+
+// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
+env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
+        .keyBy(0)
+        .flatMap(new CountWindowAverage())
+        .print();
+
+// the printed output will be (1,4) and (1,5)
+{% endhighlight %}
+
+This example implements a poor man's counting window. We key the tuples by the first field
+(in the example all have the same key `1`). The function stores the count and a running sum in
+a `ValueState`. Once the count reaches 2 it will emit the average and clear the state so that
+we start over from `0`. Note that this would keep a different state value for each different input
+key if we had tuples with different values in the first field.
+
+### State in the Scala DataStream API
+
+In addition to the interface described above, the Scala API has shortcuts for stateful
+`map()` or `flatMap()` functions with a single `ValueState` on `KeyedStream`. The user function
+gets the current value of the `ValueState` in an `Option` and must return an updated value that
+will be used to update the state.
+
+{% highlight scala %}
+val stream: DataStream[(String, Int)] = ...
+
+val counts: DataStream[(String, Int)] = stream
+  .keyBy(_._1)
+  .mapWithState((in: (String, Int), count: Option[Int]) =>
+    count match {
+      case Some(c) => ( (in._1, c), Some(c + in._2) )
+      case None => ( (in._1, 0), Some(in._2) )
+    })
+{% endhighlight %}
+
+## Using Managed Operator State
+
+A stateful function can implement either the more general `CheckpointedFunction`
+interface, or the `ListCheckpointed<T extends Serializable>` interface.
+
+In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other,
+thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
+non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
+contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
+while `(test2, 2)` will go to task 1.
+
+##### ListCheckpointed
+
+The `ListCheckpointed` interface requires the implementation of two methods:
+
+    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
+
+    void restoreState(List<T> state) throws Exception;
+
+On `snapshotState()` the operator should return a list of objects to checkpoint and
+`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always
+return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
+
+##### CheckpointedFunction
+
+The `CheckpointedFunction` interface also requires the implementation of two methods:
+
+    void snapshotState(FunctionSnapshotContext context) throws Exception;
+
+    void initializeState(FunctionInitializationContext context) throws Exception;
+
+Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized
+or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not
+only the place where different types of state are initialized, but also where state recovery logic is included.
+
+This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that
+uses state to buffer elements before sending them to the outside world:
+
+    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
+            CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+
+        private final int threshold;
+
+        private transient ListState<Tuple2<String, Integer>> checkpointedState;
+
+        private List<Tuple2<String, Integer>> bufferedElements;
+
+        public BufferingSink(int threshold) {
+            this.threshold = threshold;
+            this.bufferedElements = new ArrayList<>();
+        }
+
+        @Override
+        public void invoke(Tuple2<String, Integer> value) throws Exception {
+            bufferedElements.add(value);
+            if (bufferedElements.size() == threshold) {
+                for (Tuple2<String, Integer> element: bufferedElements) {
+                    // send it to the sink
+                }
+                bufferedElements.clear();
+            }
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            checkpointedState.clear();
+            for (Tuple2<String, Integer> element : bufferedElements) {
+                checkpointedState.add(element);
+            }
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            checkpointedState = context.getOperatorStateStore().
+                getSerializableListState("buffered-elements");
+
+            if (context.isRestored()) {
+                for (Tuple2<String, Integer> element : checkpointedState.get()) {
+                    bufferedElements.add(element);
+                }
+            }
+        }
+
+        @Override
+        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+            // this is from the CheckpointedRestoring interface.
+            this.bufferedElements.addAll(state);
+        }
+    }
+
+
+The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize
+the non-keyed state "containers". These are a container of type `ListState` where the non-keyed state objects
+are going to be stored upon checkpointing.
+
+`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");`
+
+After initializing the container, we use the `isRestored()` method of the context to check if we are
+recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied.
+
+As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state
+initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared
+of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.
+
+As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done
+using the provided `FunctionInitializationContext`.
+
+### Stateful Source Functions
+
+Stateful sources require a bit more care as opposed to other operators.
+In order to make the updates to the state and output collection atomic (required for exactly-once semantics
+on failure/recovery), the user is required to get a lock from the source's context.
+
+{% highlight java %}
+public static class CounterSource
+        extends RichParallelSourceFunction<Long>
+        implements ListCheckpointed<Long> {
+
+    /**  current offset for exactly once semantics */
+    private Long offset;
+
+    /** flag for job cancellation */
+    private volatile boolean isRunning = true;
+
+    @Override
+    public void run(SourceContext<Long> ctx) {
+        final Object lock = ctx.getCheckpointLock();
+
+        while (isRunning) {
+            // output and state update are atomic
+            synchronized (lock) {
+                ctx.collect(offset);
+                offset += 1;
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        isRunning = false;
+    }
+
+    @Override
+    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
+        return Collections.singletonList(offset);
+    }
+
+    @Override
+    public void restoreState(List<Long> state) {
+        for (Long s : state)
+            offset = s;
+    }
+}
+{% endhighlight %}
+
+Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.
+


[2/2] flink git commit: [hotfix][docs] Fix broken redirect and liquid syntax problem

Posted by al...@apache.org.
[hotfix][docs] Fix broken redirect and liquid syntax problem


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65b1da8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65b1da8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65b1da8c

Branch: refs/heads/release-1.2
Commit: 65b1da8c532304142b763e9de5d00932ae438144
Parents: 0666786
Author: David Anderson <da...@alpinegizmo.com>
Authored: Mon Jan 30 15:29:02 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jan 30 17:59:25 2017 +0100

----------------------------------------------------------------------
 docs/redirects/state.md  | 2 +-
 docs/setup/savepoints.md | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/65b1da8c/docs/redirects/state.md
----------------------------------------------------------------------
diff --git a/docs/redirects/state.md b/docs/redirects/state.md
index 7a74677..15869ba 100644
--- a/docs/redirects/state.md
+++ b/docs/redirects/state.md
@@ -1,7 +1,7 @@
 ---
 title: "Working with State"
 layout: redirect
-redirect: /dev/state.html
+redirect: /dev/stream/state.html
 permalink: /apis/streaming/state.html
 ---
 <!--

http://git-wip-us.apache.org/repos/asf/flink/blob/65b1da8c/docs/setup/savepoints.md
----------------------------------------------------------------------
diff --git a/docs/setup/savepoints.md b/docs/setup/savepoints.md
index eb12dd9..da0784d 100644
--- a/docs/setup/savepoints.md
+++ b/docs/setup/savepoints.md
@@ -27,7 +27,7 @@ under the License.
 
 ## Overview
 
-Savepoints are externally stored checkpoints that you can use to stop-and-resume or update your Flink programs. They use Flink's [checkpointing mechanism]({{ site/baseurl }}/internals/stream_checkpointing.html) to create a snapshot of the state of your streaming program and write the checkpoint meta data out to an external file system.
+Savepoints are externally stored checkpoints that you can use to stop-and-resume or update your Flink programs. They use Flink's [checkpointing mechanism]({{ site.baseurl }}/internals/stream_checkpointing.html) to create a snapshot of the state of your streaming program and write the checkpoint meta data out to an external file system.
 
 This page covers all steps involved in triggering, restoring, and disposing savepoints. In order to allow upgrades between programs and Flink versions, it is important to check out the section about [assigning IDs to your operators](#assigning-operator-ids).