You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/16 10:55:28 UTC

[05/10] flink git commit: [FLINK-5456] [docs] Add stub for types of state and state interfaces

[FLINK-5456] [docs] Add stub for types of state and state interfaces


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58509531
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58509531
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58509531

Branch: refs/heads/release-1.2
Commit: 585095312a59fee953d6b370db0a939a8392dd19
Parents: ac193d6
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 10 12:31:21 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 16 11:53:24 2017 +0100

----------------------------------------------------------------------
 docs/dev/state.md                | 362 ----------------------------------
 docs/dev/stream/checkpointing.md | 152 ++++++++++++++
 docs/dev/stream/state.md         |  78 ++++++++
 docs/internals/state_backends.md |  71 -------
 4 files changed, 230 insertions(+), 433 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58509531/docs/dev/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/state.md b/docs/dev/state.md
deleted file mode 100644
index 4478bfc..0000000
--- a/docs/dev/state.md
+++ /dev/null
@@ -1,362 +0,0 @@
----
-title: "State & Checkpointing"
-nav-parent_id: streaming
-nav-id: state
-nav-pos: 40
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-All transformations in Flink may look like functions (in the functional processing terminology), but
-are in fact stateful operators. You can make *every* transformation (`map`, `filter`, etc) stateful
-by using Flink's state interface or checkpointing instance fields of your function. You can register
-any instance field
-as ***managed*** state by implementing an interface. In this case, and also in the case of using
-Flink's native state interface, Flink will automatically take consistent snapshots of your state
-periodically, and restore its value in the case of a failure.
-
-The end effect is that updates to any form of state are the same under failure-free execution and
-execution under failures.
-
-First, we look at how to make instance fields consistent under failures, and then we look at
-Flink's state interface.
-
-By default state checkpoints will be stored in-memory at the JobManager. For proper persistence of large
-state, Flink supports storing the checkpoints on file systems (HDFS, S3, or any mounted POSIX file system),
-which can be configured in the `flink-conf.yaml` or via `StreamExecutionEnvironment.setStateBackend(\u2026)`.
-See [state backends]({{ site.baseurl }}/ops/state_backends.html) for information
-about the available state backends and how to configure them.
-
-* ToC
-{:toc}
-
-Enabling Checkpointing
--------------------------
-
-Flink has a checkpointing mechanism that recovers streaming jobs after failures. The checkpointing mechanism requires a *persistent* (or *durable*) source that
-can be asked for prior records again (Apache Kafka is a good example of such a source).
-
-The checkpointing mechanism stores the progress in the data sources and data sinks, the state of windows, as well as the user-defined state (see [Working with State]({{ site.baseurl }}/dev/state.html)) consistently to provide *exactly once* processing semantics. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured [state backend]({{ site.baseurl }}/ops/state_backends.html).
-
-The [docs on streaming fault tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html) describe in detail the technique behind Flink's streaming fault tolerance mechanism.
-
-By default, checkpointing is disabled. To enable checkpointing, call `enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the checkpoint interval in milliseconds.
-
-Other parameters for checkpointing include:
-
-- *exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels.
-  Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.
-
-- *number of concurrent checkpoints*: By default, the system will not trigger another checkpoint while one is still in progress. This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints (100s of milliseconds) to re-process very little upon failures.
-
-- *checkpoint timeout*: The time after which a checkpoint-in-progress is aborted, if it did not complete by then.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-// start a checkpoint every 1000 ms
-env.enableCheckpointing(1000);
-
-// advanced options:
-
-// set mode to exactly-once (this is the default)
-env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-
-// checkpoints have to complete within one minute, or are discarded
-env.getCheckpointConfig().setCheckpointTimeout(60000);
-
-// allow only one checkpoint to be in progress at the same time
-env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-
-// start a checkpoint every 1000 ms
-env.enableCheckpointing(1000)
-
-// advanced options:
-
-// set mode to exactly-once (this is the default)
-env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
-
-// checkpoints have to complete within one minute, or are discarded
-env.getCheckpointConfig.setCheckpointTimeout(60000)
-
-// allow only one checkpoint to be in progress at the same time
-env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-## Using the Key/Value State Interface
-
-The Key/Value state interface provides access to different types of state that are all scoped to
-the key of the current input element. This means that this type of state can only be used
-on a `KeyedStream`, which can be created via `stream.keyBy(\u2026)`.
-
-Now, we will first look at the different types of state available and then we will see
-how they can be used in a program. The available state primitives are:
-
-* `ValueState<T>`: This keeps a value that can be updated and
-retrieved (scoped to key of the input element, mentioned above, so there will possibly be one value
-for each key that the operation sees). The value can be set using `update(T)` and retrieved using
-`T value()`.
-
-* `ListState<T>`: This keeps a list of elements. You can append elements and retrieve an `Iterable`
-over all currently stored elements. Elements are added using `add(T)`, the Iterable can
-be retrieved using `Iterable<T> get()`.
-
-* `ReducingState<T>`: This keeps a single value that represents the aggregation of all values
-added to the state. The interface is the same as for `ListState` but elements added using
-`add(T)` are reduced to an aggregate using a specified `ReduceFunction`.
-
-All types of state also have a method `clear()` that clears the state for the currently
-active key (i.e. the key of the input element).
-
-It is important to keep in mind that these state objects are only used for interfacing
-with state. The state is not necessarily stored inside but might reside on disk or somewhere else.
-The second thing to keep in mind is that the value you get from the state
-depends on the key of the input element. So the value you get in one invocation of your
-user function can differ from the value in another invocation if the keys involved are different.
-
-To get a state handle you have to create a `StateDescriptor`. This holds the name of the state
-(as we will later see you can create several states, and they have to have unique names so
-that you can reference them), the type of the values that the state holds, and possibly
-a user-specified function, such as a `ReduceFunction`. Depending on what type of state you
-want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor` or
-a `ReducingStateDescriptor`.
-
-State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*.
-Please see [here]({{ site.baseurl }}/dev/api_concepts#rich-functions) for
-information about that, but we will also see an example shortly. The `RuntimeContext` that
-is available in a `RichFunction` has these methods for accessing state:
-
-* `ValueState<T> getState(ValueStateDescriptor<T>)`
-* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
-* `ListState<T> getListState(ListStateDescriptor<T>)`
-
-This is an example `FlatMapFunction` that shows how all of the parts fit together:
-
-{% highlight java %}
-public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-    /**
-     * The ValueState handle. The first field is the count, the second field a running sum.
-     */
-    private transient ValueState<Tuple2<Long, Long>> sum;
-
-    @Override
-    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
-
-        // access the state value
-        Tuple2<Long, Long> currentSum = sum.value();
-
-        // update the count
-        currentSum.f0 += 1;
-
-        // add the second field of the input value
-        currentSum.f1 += input.f1;
-
-        // update the state
-        sum.update(currentSum);
-
-        // if the count reaches 2, emit the average and clear the state
-        if (currentSum.f0 >= 2) {
-            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
-            sum.clear();
-        }
-    }
-
-    @Override
-    public void open(Configuration config) {
-        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
-                new ValueStateDescriptor<>(
-                        "average", // the state name
-                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
-                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
-        sum = getRuntimeContext().getState(descriptor);
-    }
-}
-
-// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
-env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
-        .keyBy(0)
-        .flatMap(new CountWindowAverage())
-        .print();
-
-// the printed output will be (1,4) and (1,5)
-{% endhighlight %}
-
-This example implements a poor man's counting window. We key the tuples by the first field
-(in the example all have the same key `1`). The function stores the count and a running sum in
-a `ValueState`. Once the count reaches 2 it will emit the average and clear the state so that
-we start over from `0`. Note that this would keep a different state value for each different input
-key if we had tuples with different values in the first field.
-
-### State in the Scala DataStream API
-
-In addition to the interface described above, the Scala API has shortcuts for stateful
-`map()` or `flatMap()` functions with a single `ValueState` on `KeyedStream`. The user function
-gets the current value of the `ValueState` in an `Option` and must return an updated value that
-will be used to update the state.
-
-{% highlight scala %}
-val stream: DataStream[(String, Int)] = ...
-
-val counts: DataStream[(String, Int)] = stream
-  .keyBy(_._1)
-  .mapWithState((in: (String, Int), count: Option[Int]) =>
-    count match {
-      case Some(c) => ( (in._1, c), Some(c + in._2) )
-      case None => ( (in._1, 0), Some(in._2) )
-    })
-{% endhighlight %}
-
-## Checkpointing Instance Fields
-
-Instance fields can be checkpointed by using the `Checkpointed` interface.
-
-When the user-defined function implements the `Checkpointed` interface, the `snapshotState(\u2026)` and `restoreState(\u2026)`
-methods will be executed to draw and restore function state.
-
-In addition to that, user functions can also implement the `CheckpointListener` interface to receive notifications on
-completed checkpoints via the `notifyCheckpointComplete(long checkpointId)` method.
-Note that there is no guarantee for the user function to receive a notification if a failure happens between
-checkpoint completion and notification. The notifications should hence be treated in a way that notifications from
-later checkpoints can subsume missing notifications.
-
-The above example for `ValueState` can be implemented using instance fields like this:
-
-{% highlight java %}
-
-public class CountWindowAverage
-        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-        implements Checkpointed<Tuple2<Long, Long>> {
-
-    private Tuple2<Long, Long> sum = null;
-
-    @Override
-    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
-
-        // update the count
-        sum.f0 += 1;
-
-        // add the second field of the input value
-        sum.f1 += input.f1;
-
-
-        // if the count reaches 2, emit the average and clear the state
-        if (sum.f0 >= 2) {
-            out.collect(new Tuple2<>(input.f0, sum.f1 / sum.f0));
-            sum = Tuple2.of(0L, 0L);
-        }
-    }
-
-    @Override
-    public void open(Configuration config) {
-        if (sum == null) {
-            // only recreate if null
-            // restoreState will be called before open()
-            // so this will already set the sum to the restored value
-            sum = Tuple2.of(0L, 0L);
-        }
-    }
-
-    // regularly persists state during normal operation
-    @Override
-    public Serializable snapshotState(long checkpointId, long checkpointTimestamp) {
-        return sum;
-    }
-
-    // restores state on recovery from failure
-    @Override
-    public void restoreState(Tuple2<Long, Long> state) {
-        sum = state;
-    }
-}
-{% endhighlight %}
-
-## Stateful Source Functions
-
-Stateful sources require a bit more care as opposed to other operators.
-In order to make the updates to the state and output collection atomic (required for exactly-once semantics
-on failure/recovery), the user is required to get a lock from the source's context.
-
-{% highlight java %}
-public static class CounterSource
-        extends RichParallelSourceFunction<Long>
-        implements Checkpointed<Long> {
-
-    /**  current offset for exactly once semantics */
-    private long offset;
-
-    /** flag for job cancellation */
-    private volatile boolean isRunning = true;
-
-    @Override
-    public void run(SourceContext<Long> ctx) {
-        final Object lock = ctx.getCheckpointLock();
-
-        while (isRunning) {
-            // output and state update are atomic
-            synchronized (lock) {
-                ctx.collect(offset);
-                offset += 1;
-            }
-        }
-    }
-
-    @Override
-    public void cancel() {
-        isRunning = false;
-    }
-
-    @Override
-    public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-        return offset;
-
-    }
-
-    @Override
-	public void restoreState(Long state) {
-        offset = state;
-    }
-}
-{% endhighlight %}
-
-Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.
-
-## State Checkpoints in Iterative Jobs
-
-Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: `env.enableCheckpointing(interval, force = true)`.
-
-Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.
-
-{% top %}
-
-## Restart Strategies
-
-Flink supports different restart strategies which control how the jobs are restarted in case of a failure. For more 
-information, see [Restart Strategies]({{ site.baseurl }}/dev/restart_strategies.html).

http://git-wip-us.apache.org/repos/asf/flink/blob/58509531/docs/dev/stream/checkpointing.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/checkpointing.md b/docs/dev/stream/checkpointing.md
new file mode 100644
index 0000000..774d9ef
--- /dev/null
+++ b/docs/dev/stream/checkpointing.md
@@ -0,0 +1,152 @@
+---
+title: "Checkpointing"
+nav-parent_id: streaming
+nav-pos: 50
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* ToC
+{:toc}
+
+Every function and operator in Flink can be **stateful** (see [working with state](state.html) for details).
+Stateful functions store data across the processing of individual elements/events, making state a critical building block for
+any type of more elaborate operation.
+
+In order to make state fault tolerant, Flink needs to **checkpoint** the state. Checkpoints allow Flink to recover state and positions
+in the streams to give the application the same semantics as a failure-free execution.
+
+The [documentation on streaming fault tolerance](../../internals/stream_checkpointing.html) describe in detail the technique behind Flink's streaming fault tolerance mechanism.
+
+
+## Prerequisites
+
+Flink's checkpointing mechanism interacts with durable storage for streams and state. In general, it requires:
+
+  - A *persistent* (or *durable*) data source that can replay records for a certain amount of time. Examples for such sources are persistent messages queues (e.g., Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub) or file systems (e.g., HDFS, S3, GFS, NFS, Ceph, ...).
+  - A persistent storage for state, typically a distributed filesystem (e.g., HDFS, S3, GFS, NFS, Ceph, ...)
+
+
+## Enabling and Configuring Checkpointing
+
+By default, checkpointing is disabled. To enable checkpointing, call `enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the checkpoint interval in milliseconds.
+
+Other parameters for checkpointing include:
+
+  - *exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels.
+    Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.
+
+  - *checkpoint timeout*: The time after which a checkpoint-in-progress is aborted, if it did not complete by then.
+
+  - *minimum time between checkpoints*: To make sure that the streaming application makes a certain amount of progress between checkpoints,
+    one can define how much time needs to pass between checkpoints. If this value is set for example to *5000*, the next checkpoint will be
+    started no sooner than 5 seconds after the previous checkpoint completed, regardless of the checkpoint duration and the checkpoint interval.
+    Note that this implies that the checkpoint interval will never be smaller than this parameter.
+    
+    It is often easier to configure applications by defining the "time between checkpoints" then the checkpoint interval, because the "time between checkpoints"
+    is not susceptible to the fact that checkpoints may sometimes take longer than on average (for example if the target storage system is temporarily slow).
+
+    Note that this value also implies that the number of concurrent checkpoints is *one*.
+
+  - *number of concurrent checkpoints*: By default, the system will not trigger another checkpoint while one is still in progress.
+    This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams.
+    It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay
+    (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints
+    (100s of milliseconds) to re-process very little upon failures.
+
+    This option cannot be used when a minimum time between checkpoints is defined.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+// start a checkpoint every 1000 ms
+env.enableCheckpointing(1000);
+
+// advanced options:
+
+// set mode to exactly-once (this is the default)
+env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+
+// make sure 500 ms of progress happen between checkpoints
+env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
+
+// checkpoints have to complete within one minute, or are discarded
+env.getCheckpointConfig().setCheckpointTimeout(60000);
+
+// allow only one checkpoint to be in progress at the same time
+env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+// start a checkpoint every 1000 ms
+env.enableCheckpointing(1000)
+
+// advanced options:
+
+// set mode to exactly-once (this is the default)
+env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+
+// make sure 500 ms of progress happen between checkpoints
+env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
+
+// checkpoints have to complete within one minute, or are discarded
+env.getCheckpointConfig.setCheckpointTimeout(60000)
+
+// allow only one checkpoint to be in progress at the same time
+env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+
+## Selecting a State Backend
+
+The checkpointing mechanism stores the progress in the data sources and data sinks, the state of windows, as well as the [user-defined state](state.html) consistently to
+provide *exactly once* processing semantics. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured
+**State Backend**. 
+
+By default state will be kept in memory, and checkpoints will be stored in-memory at the master node (the JobManager). For proper persistence of large state,
+Flink supports various forms of storing and checkpointing state in so called **State Backends**, which can be set via `StreamExecutionEnvironment.setStateBackend(\u2026)`.
+
+See [state backends](../../ops/state_backends.html) for more details on the available state backends and options for job-wide and cluster-wide configuration.
+
+
+## State Checkpoints in Iterative Jobs
+
+Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: `env.enableCheckpointing(interval, force = true)`.
+
+Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.
+
+{% top %}
+
+
+## Restart Strategies
+
+Flink supports different restart strategies which control how the jobs are restarted in case of a failure. For more 
+information, see [Restart Strategies]({{ site.baseurl }}/dev/restart_strategies.html).
+
+{% top %}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/58509531/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
new file mode 100644
index 0000000..0b38a62
--- /dev/null
+++ b/docs/dev/stream/state.md
@@ -0,0 +1,78 @@
+---
+title: "Working with State"
+nav-parent_id: streaming
+nav-pos: 40
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* ToC
+{:toc}
+
+Stateful functions and operators store data across the processing of individual elements/events, making state a critical building block for
+any type of more elaborate operation. For example: 
+
+  - When an application searches for certain event patterns, the state will store the sequence of events encountered so far.
+  - When aggregating events per minute, the state holds the pending aggregates.
+  - When training a machine learning model over a stream of data points, the state holds the current verstion of the model parameters.
+
+In order to make state fault tolerant, Flink needs to be aware of the state and [checkpoint](checkpointing.html) it.
+In many cases, Flink can also *manage* the state for the application, meaning Flink deals with the memory management (possibly spilling to disk
+if necessary) to allow applications to hold very large state.
+
+This document explains how to use Flink's state abstractions when developing an application.
+
+
+## Keyed State and Operator state
+
+There are two basic state backends: `Keyed State` and `Operator State`.
+
+#### Keyed State
+
+*Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`.
+Examples of keyed state are the `ValueState` or `ListState` that one can create in a function on a `KeyedStream`, as
+well as the state of a keyed window operator.
+
+Keyed State is organized in so called *Key Groups*. Key Groups are the unit by which keyed state can be redistributed and
+there are as many key groups as the defined maximum parallelism.
+During execution each parallel instance of an operator gets one or more key groups.
+
+#### Operator State
+
+*Operator State* is state per parallel subtask. It subsumes the `Checkpointed` interface in Flink 1.0 and Flink 1.1.
+The new `CheckpointedFunction` interface is basically a shortcut (syntactic sugar) for the Operator State.
+
+Operator State needs special re-distribution schemes when parallelism is changed. There can be different variations of such
+schemes; the following are currently defined:
+
+  - **List-style redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of
+    all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators.
+    Each operator gets a sublist, which can be empty, or contain one or more elements.
+
+
+## Raw and Managed State
+
+*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*.
+
+*Managed State* is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB.
+Examples are "ValueState", "ListState", etc. Flink's runtime encodes the states and writes them into the checkpoints.
+
+*Raw State* is state that users and operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into
+the checkpoint. Flink knows nothing about the state's data structures and sees only the raw bytes.
+

http://git-wip-us.apache.org/repos/asf/flink/blob/58509531/docs/internals/state_backends.md
----------------------------------------------------------------------
diff --git a/docs/internals/state_backends.md b/docs/internals/state_backends.md
deleted file mode 100644
index 11d46ed..0000000
--- a/docs/internals/state_backends.md
+++ /dev/null
@@ -1,71 +0,0 @@
----
-title:  "State and State Backends"
-nav-title: State Backends
-nav-parent_id: internals
-nav-pos: 4
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* This will be replaced by the TOC
-{:toc}
-
-**NOTE** This document is only a sketch of some bullet points, to be fleshed out.
-
-**NOTE** The structure of State Backends changed heavily between version 1.1 and 1.2. This documentation is only applicable
-to Apache Flink version 1.2 and later.
-
-
-## Keyed State and Operator state
-
-There are two basic state backends: `Keyed State` and `Operator State`.
-
-#### Keyed State
-
-*Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`.
-Examples of keyed state are the `ValueState` or `ListState` that one can create in a function on a `KeyedStream`, as
-well as the state of a keyed window operator.
-
-Keyed State is organized in so called *Key Groups*. Key Groups are the unit by which keyed state can be redistributed and
-there are as many key groups as the defined maximum parallelism.
-During execution each parallel instance of an operator gets one or more key groups.
-
-#### Operator State
-
-*Operator State* is state per parallel subtask. It subsumes the `Checkpointed` interface in Flink 1.0 and Flink 1.1.
-The new `CheckpointedFunction` interface is basically a shortcut (syntactic sugar) for the Operator State.
-
-Operator State needs special re-distribution schemes when parallelism is changed. There can be different variations of such
-schemes; the following are currently defined:
-
-  - **List-style redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of
-    all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators.
-    Each operator gets a sublist, which can be empty, or contain one or more elements.
-
-
-## Raw and Managed State
-
-*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*.
-
-*Managed State* is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB.
-Examples are "ValueState", "ListState", etc. Flink's runtime encodes the states and writes them into the checkpoints.
-
-*Raw State* is state that users and operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into
-the checkpoint. Flink knows nothing about the state's data structures and sees only the raw bytes.
-