You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/06/03 13:06:31 UTC

[flink] 04/04: [FLINK-18032] Remove outdated sections in migration guide

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a3250c6a82a71f7bd9617cbf085bb40393576292
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Fri May 29 10:53:14 2020 +0200

    [FLINK-18032] Remove outdated sections in migration guide
---
 docs/dev/migration.md    | 456 +----------------------------------------------
 docs/dev/migration.zh.md | 456 +----------------------------------------------
 2 files changed, 8 insertions(+), 904 deletions(-)

diff --git a/docs/dev/migration.md b/docs/dev/migration.md
index 288e2ef..09556c1 100644
--- a/docs/dev/migration.md
+++ b/docs/dev/migration.md
@@ -25,6 +25,10 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
+See the [older migration
+guide](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/migration.html)
+for information about migrating from older versions than Flink 1.3.
+
 ## Migrating from Flink 1.3+ to Flink 1.7
 
 ### API changes for serializer snapshots
@@ -35,456 +39,4 @@ The old `TypeSerializerConfigSnapshot` abstraction is now deprecated, and will b
 in favor of the new `TypeSerializerSnapshot`. For details and guides on how to migrate, please see
 [Migrating from deprecated serializer snapshot APIs before Flink 1.7]({{ site.baseurl }}/dev/stream/state/custom_serialization.html#migrating-from-deprecated-serializer-snapshot-apis-before-flink-17).
 
-## Migrating from Flink 1.2 to Flink 1.3
-
-There are a few APIs that have been changed since Flink 1.2. Most of the changes are documented in their
-specific documentations. The following is a consolidated list of API changes and links to details for migration when
-upgrading to Flink 1.3.
-
-### `TypeSerializer` interface changes
-
-This would be relevant mostly for users implementing custom `TypeSerializer`s for their state.
-
-Since Flink 1.3, two additional methods have been added that are related to serializer compatibility
-across savepoint restores. Please see
-[Handling serializer upgrades and compatibility]({{ site.baseurl }}/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility)
-for further details on how to implement these methods.
-
-### `ProcessFunction` is always a `RichFunction`
-
-In Flink 1.2, `ProcessFunction` and its rich variant `RichProcessFunction` was introduced.
-Since Flink 1.3, `RichProcessFunction` was removed and `ProcessFunction` is now always a `RichFunction` with access to
-the lifecycle methods and runtime context.
-
-### Flink CEP library API changes
-
-The CEP library in Flink 1.3 ships with a number of new features which have led to some changes in the API.
-Please visit the [CEP Migration docs]({{ site.baseurl }}/dev/libs/cep.html#migrating-from-an-older-flink-version) for details.
-
-### Logger dependencies removed from Flink core artifacts
-
-In Flink 1.3, to make sure that users can use their own custom logging framework, core Flink artifacts are
-now clean of specific logger dependencies.
-
-Example and quickstart archetypes already have loggers specified and should not be affected.
-For other custom projects, make sure to add logger dependencies. For example, in Maven's `pom.xml`, you can add:
-
-{% highlight xml %}
-<dependency>
-    <groupId>org.slf4j</groupId>
-    <artifactId>slf4j-log4j12</artifactId>
-    <version>1.7.7</version>
-</dependency>
-
-<dependency>
-    <groupId>log4j</groupId>
-    <artifactId>log4j</artifactId>
-    <version>1.2.17</version>
-</dependency>
-{% endhighlight %}
-
-## Migrating from Flink 1.1 to Flink 1.2
-
-As mentioned in the [State documentation]({{ site.baseurl }}/dev/stream/state/state.html), Flink has two types of state:
-**keyed** and **non-keyed** state (also called **operator** state). Both types are available to
-both operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1
-function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern the
-deprecation of the aligned window operators from Flink 1.1 (see [Aligned Processing Time Window Operators](#aligned-processing-time-window-operators)).
-
-The migration process will serve two goals:
-
-1. allow your functions to take advantage of the new features introduced in Flink 1.2, such as rescaling,
-
-2. make sure that your new Flink 1.2 job will be able to resume execution from a savepoint generated by its
-Flink 1.1 predecessor.
-
-After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2
-simply by taking a [savepoint]({{ site.baseurl }}/ops/state/savepoints.html) with your Flink 1.1 job and giving it to
-your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where its
-Flink 1.1 predecessor left off.
-
-### Example User Functions
-
-As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
-functions. The first is an example of a function with **keyed** state, while
-the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below:
-
-{% highlight java %}
-public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
-
-    private transient ValueState<Integer> counter;
-
-    private final int numberElements;
-
-    public CountMapper(int numberElements) {
-        this.numberElements = numberElements;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        counter = getRuntimeContext().getState(
-            new ValueStateDescriptor<>("counter", Integer.class, 0));
-    }
-
-    @Override
-    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
-        int count = counter.value() + 1;
-        counter.update(count);
-
-        if (count % numberElements == 0) {
-            out.collect(Tuple2.of(value.f0, count));
-            counter.update(0); // reset to 0
-        }
-    }
-}
-
-public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-    Checkpointed<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private ArrayList<Tuple2<String, Integer>> bufferedElements;
-
-    BufferingSink(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public ArrayList<Tuple2<String, Integer>> snapshotState(
-        long checkpointId, long checkpointTimestamp) throws Exception {
-        return bufferedElements;
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-        bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-
-The `CountMapper` is a `RichFlatMapFunction` which assumes a grouped-by-key input stream of the form
-`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if
-the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted
-containing the word itself and the number of occurrences.
-
-The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`)
-and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink.
-This is a common way to avoid many expensive calls to a database or an external storage system. To do the
-buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is
-periodically checkpointed.
-
-### State API Migration
-
-To leverage the new features of Flink 1.2, the code above should be modified to use the new state abstractions.
-After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you
-are guaranteed that the new version of your job will start from where its predecessor left off.
-
-**Keyed State:** Something to note before delving into the details of the migration process is that if your function
-has **only keyed state**, then the exact same code from Flink 1.1 also works for Flink 1.2 with full support
-for the new features and full backwards compatibility. Changes could be made just for better code organization,
-but this is just a matter of style.
-
-With the above said, the rest of this section focuses on the **non-keyed state**.
-
-#### Rescaling and new state abstractions
-
-The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface
-to the new ones. In Flink 1.2, a stateful function can implement either the more general `CheckpointedFunction`
-interface, or the `ListCheckpointed<T extends Serializable>` interface, which is semantically closer to the old
-`Checkpointed` one.
-
-In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other,
-thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
-non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
-contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
-while `(test2, 2)` will go to task 1.
-
-More details on the principles behind rescaling of both keyed state and non-keyed state can be found in
-the [State documentation]({{ site.baseurl }}/dev/stream/state/index.html).
-
-##### ListCheckpointed
-
-The `ListCheckpointed` interface requires the implementation of two methods:
-
-{% highlight java %}
-List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
-
-void restoreState(List<T> state) throws Exception;
-{% endhighlight %}
-
-Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference
-is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and
-`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always
-return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink`
-is included below:
-
-{% highlight java %}
-public class BufferingSinkListCheckpointed implements
-        SinkFunction<Tuple2<String, Integer>>,
-        ListCheckpointed<Tuple2<String, Integer>>,
-        CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private transient ListState<Tuple2<String, Integer>> checkpointedState;
-
-    private List<Tuple2<String, Integer>> bufferedElements;
-
-    public BufferingSinkListCheckpointed(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        this.bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public List<Tuple2<String, Integer>> snapshotState(
-            long checkpointId, long timestamp) throws Exception {
-        return this.bufferedElements;
-    }
-
-    @Override
-    public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
-        if (!state.isEmpty()) {
-            this.bufferedElements.addAll(state);
-        }
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-        // this is from the CheckpointedRestoring interface.
-        this.bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards
-compatibility reasons and more details will be explained at the end of this section.
-
-##### CheckpointedFunction
-
-The `CheckpointedFunction` interface requires again the implementation of two methods:
-
-{% highlight java %}
-void snapshotState(FunctionSnapshotContext context) throws Exception;
-
-void initializeState(FunctionInitializationContext context) throws Exception;
-{% endhighlight %}
-
-As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is
-the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only
-in the case that we are recovering from a failure. Given this, `initializeState()` is not only the place where different
-types of state are initialized, but also where state recovery logic is included. An implementation of the
-`CheckpointedFunction` interface for `BufferingSink` is presented below.
-
-{% highlight java %}
-public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-        CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private transient ListState<Tuple2<String, Integer>> checkpointedState;
-
-    private List<Tuple2<String, Integer>> bufferedElements;
-
-    public BufferingSink(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws Exception {
-        checkpointedState.clear();
-        for (Tuple2<String, Integer> element : bufferedElements) {
-            checkpointedState.add(element);
-        }
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws Exception {
-        checkpointedState = context.getOperatorStateStore().
-            getSerializableListState("buffered-elements");
-
-        if (context.isRestored()) {
-            for (Tuple2<String, Integer> element : checkpointedState.get()) {
-                bufferedElements.add(element);
-            }
-        }
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-        // this is from the CheckpointedRestoring interface.
-        this.bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize
-the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects
-are going to be stored upon checkpointing:
-
-`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");`
-
-After initializing the container, we use the `isRestored()` method of the context to check if we are
-recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied.
-
-As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state
-initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared
-of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.
-
-As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done
-using the `FunctionInitializationContext` given as argument, instead of the `RuntimeContext`, which is the case
-for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the `CountMapper` example,
-the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods
-would look like this:
-
-{% highlight java %}
-public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
-        implements CheckpointedFunction {
-
-    private transient ValueState<Integer> counter;
-
-    private final int numberElements;
-
-    public CountMapper(int numberElements) {
-        this.numberElements = numberElements;
-    }
-
-    @Override
-    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
-        int count = counter.value() + 1;
-        counter.update(count);
-
-        if (count % numberElements == 0) {
-            out.collect(Tuple2.of(value.f0, count));
-            counter.update(0); // reset to 0
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws Exception {
-        // all managed, nothing to do.
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws Exception {
-        counter = context.getKeyedStateStore().getState(
-            new ValueStateDescriptor<>("counter", Integer.class, 0));
-    }
-}
-{% endhighlight %}
-
-Notice that the `snapshotState()` method is empty as Flink itself takes care of snapshotting managed keyed state
-upon checkpointing.
-
-#### Backwards compatibility with Flink 1.1
-
-So far we have seen how to modify our functions to take advantage of the new features introduced by Flink 1.2.
-The question that remains is "Can I make sure that my modified (Flink 1.2) job will start from where my already
-running job from Flink 1.1 stopped?".
-
-The answer is yes, and the way to do it is pretty straightforward. For the keyed state, you have to do nothing.
-Flink will take care of restoring the state from Flink 1.1. For the non-keyed state, your new function has to
-implement the `CheckpointedRestoring` interface, as shown in the code above. This has a single method, the
-familiar `restoreState()` from the old `Checkpointed` interface from Flink 1.1. As shown in the modified code of
-the `BufferingSink`, the `restoreState()` method is identical to its predecessor.
-
-### Aligned Processing Time Window Operators
-
-In Flink 1.1, and only when operating on *processing time* with no specified evictor or trigger,
-the command `timeWindow()` on a keyed stream would instantiate a special type of `WindowOperator`. This could be
-either an `AggregatingProcessingTimeWindowOperator` or an `AccumulatingProcessingTimeWindowOperator`. Both of
-these operators are referred to as *aligned* window operators as they assume their input elements arrive in
-order. This is valid when operating in processing time, as elements get as timestamp the wall-clock time at
-the moment they arrive at the window operator. These operators were restricted to using the memory state backend, and
-had optimized data structures for storing the per-window elements which leveraged the in-order input element arrival.
-
-In Flink 1.2, the aligned window operators are deprecated, and all windowing operations go through the generic
-`WindowOperator`. This migration requires no change in the code of your Flink 1.1 job, as Flink will transparently
-read the state stored by the aligned window operators in your Flink 1.1 savepoint, translate it into a format
-that is compatible with the generic `WindowOperator`, and resume execution using the generic `WindowOperator`.
-
-<span class="label label-info">Note</span> Although deprecated, you can still use the aligned window operators
-in Flink 1.2 through special `WindowAssigners` introduced for exactly this purpose. These assigners are the
-`SlidingAlignedProcessingTimeWindows` and the `TumblingAlignedProcessingTimeWindows` assigners, for sliding and tumbling
-windows respectively. A Flink 1.2 job that uses aligned windowing has to be a new job, as there is no way to
-resume execution from a Flink 1.1 savepoint while using these operators.
-
-<span class="label label-danger">Attention</span> The aligned window operators provide **no rescaling** capabilities
-and **no backwards compatibility** with Flink 1.1.
-
-The code to use the aligned window operators in Flink 1.2 is presented below:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-// for tumbling windows
-DataStream<Tuple2<String, Integer>> window1 = source
-    .keyBy(0)
-    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
-    .apply(your-function)
-
-// for sliding windows
-DataStream<Tuple2<String, Integer>> window1 = source
-    .keyBy(0)
-    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
-    .apply(your-function)
-
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-// for tumbling windows
-val window1 = source
-    .keyBy(0)
-    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
-    .apply(your-function)
-
-// for sliding windows
-val window2 = source
-    .keyBy(0)
-    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
-    .apply(your-function)
-
-{% endhighlight %}
-</div>
-</div>
-
 {% top %}
diff --git a/docs/dev/migration.zh.md b/docs/dev/migration.zh.md
index fb1c559..76b4e8c 100644
--- a/docs/dev/migration.zh.md
+++ b/docs/dev/migration.zh.md
@@ -25,6 +25,10 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
+See the [older migration
+guide](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/migration.html)
+for information about migrating from older versions than Flink 1.3.
+
 ## Migrating from Flink 1.3+ to Flink 1.7
 
 ### API changes for serializer snapshots
@@ -35,456 +39,4 @@ The old `TypeSerializerConfigSnapshot` abstraction is now deprecated, and will b
 in favor of the new `TypeSerializerSnapshot`. For details and guides on how to migrate, please see
 [Migrating from deprecated serializer snapshot APIs before Flink 1.7]({{ site.baseurl }}/dev/stream/state/custom_serialization.html#migrating-from-deprecated-serializer-snapshot-apis-before-flink-17).
 
-## Migrating from Flink 1.2 to Flink 1.3
-
-There are a few APIs that have been changed since Flink 1.2. Most of the changes are documented in their
-specific documentations. The following is a consolidated list of API changes and links to details for migration when
-upgrading to Flink 1.3.
-
-### `TypeSerializer` interface changes
-
-This would be relevant mostly for users implementing custom `TypeSerializer`s for their state.
-
-Since Flink 1.3, two additional methods have been added that are related to serializer compatibility
-across savepoint restores. Please see
-[Handling serializer upgrades and compatibility]({{ site.baseurl }}/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility)
-for further details on how to implement these methods.
-
-### `ProcessFunction` is always a `RichFunction`
-
-In Flink 1.2, `ProcessFunction` and its rich variant `RichProcessFunction` was introduced.
-Since Flink 1.3, `RichProcessFunction` was removed and `ProcessFunction` is now always a `RichFunction` with access to
-the lifecycle methods and runtime context.
-
-### Flink CEP library API changes
-
-The CEP library in Flink 1.3 ships with a number of new features which have led to some changes in the API.
-Please visit the [CEP Migration docs]({{ site.baseurl }}/dev/libs/cep.html#migrating-from-an-older-flink-version) for details.
-
-### Logger dependencies removed from Flink core artifacts
-
-In Flink 1.3, to make sure that users can use their own custom logging framework, core Flink artifacts are
-now clean of specific logger dependencies.
-
-Example and quickstart archetypes already have loggers specified and should not be affected.
-For other custom projects, make sure to add logger dependencies. For example, in Maven's `pom.xml`, you can add:
-
-{% highlight xml %}
-<dependency>
-    <groupId>org.slf4j</groupId>
-    <artifactId>slf4j-log4j12</artifactId>
-    <version>1.7.7</version>
-</dependency>
-
-<dependency>
-    <groupId>log4j</groupId>
-    <artifactId>log4j</artifactId>
-    <version>1.2.17</version>
-</dependency>
-{% endhighlight %}
-
-## Migrating from Flink 1.1 to Flink 1.2
-
-As mentioned in the [State documentation]({{ site.baseurl }}/dev/stream/state/state.html), Flink has two types of state:
-**keyed** and **non-keyed** state (also called **operator** state). Both types are available to
-both operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1
-function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern the
-deprecation of the aligned window operators from Flink 1.1 (see [Aligned Processing Time Window Operators](#aligned-processing-time-window-operators)).
-
-The migration process will serve two goals:
-
-1. allow your functions to take advantage of the new features introduced in Flink 1.2, such as rescaling,
-
-2. make sure that your new Flink 1.2 job will be able to resume execution from a savepoint generated by its
-Flink 1.1 predecessor.
-
-After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2
-simply by taking a [savepoint]({{ site.baseurl }}/ops/state/savepoints.html) with your Flink 1.1 job and giving it to
-your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where its
-Flink 1.1 predecessor left off.
-
-### Example User Functions
-
-As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
-functions. The first is an example of a function with **keyed** state, while
-the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below:
-
-{% highlight java %}
-public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
-
-    private transient ValueState<Integer> counter;
-
-    private final int numberElements;
-
-    public CountMapper(int numberElements) {
-        this.numberElements = numberElements;
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        counter = getRuntimeContext().getState(
-            new ValueStateDescriptor<>("counter", Integer.class, 0));
-    }
-
-    @Override
-    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
-        int count = counter.value() + 1;
-        counter.update(count);
-
-        if (count % numberElements == 0) {
-            out.collect(Tuple2.of(value.f0, count));
-            counter.update(0); // reset to 0
-        }
-    }
-}
-
-public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-    Checkpointed<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private ArrayList<Tuple2<String, Integer>> bufferedElements;
-
-    BufferingSink(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public ArrayList<Tuple2<String, Integer>> snapshotState(
-        long checkpointId, long checkpointTimestamp) throws Exception {
-        return bufferedElements;
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-        bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-
-The `CountMapper` is a `RichFlatMapFunction` which assumes a grouped-by-key input stream of the form
-`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if
-the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted
-containing the word itself and the number of occurrences.
-
-The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`)
-and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink.
-This is a common way to avoid many expensive calls to a database or an external storage system. To do the
-buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is
-periodically checkpointed.
-
-### State API Migration
-
-To leverage the new features of Flink 1.2, the code above should be modified to use the new state abstractions.
-After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you
-are guaranteed that the new version of your job will start from where its predecessor left off.
-
-**Keyed State:** Something to note before delving into the details of the migration process is that if your function
-has **only keyed state**, then the exact same code from Flink 1.1 also works for Flink 1.2 with full support
-for the new features and full backwards compatibility. Changes could be made just for better code organization,
-but this is just a matter of style.
-
-With the above said, the rest of this section focuses on the **non-keyed state**.
-
-#### Rescaling and new state abstractions
-
-The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface
-to the new ones. In Flink 1.2, a stateful function can implement either the more general `CheckpointedFunction`
-interface, or the `ListCheckpointed<T extends Serializable>` interface, which is semantically closer to the old
-`Checkpointed` one.
-
-In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other,
-thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
-non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
-contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
-while `(test2, 2)` will go to task 1.
-
-More details on the principles behind rescaling of both keyed state and non-keyed state can be found in
-the [State documentation]({{ site.baseurl }}/dev/stream/state/index.html).
-
-##### ListCheckpointed
-
-The `ListCheckpointed` interface requires the implementation of two methods:
-
-{% highlight java %}
-List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
-
-void restoreState(List<T> state) throws Exception;
-{% endhighlight %}
-
-Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference
-is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and
-`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always
-return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink`
-is included below:
-
-{% highlight java %}
-public class BufferingSinkListCheckpointed implements
-        SinkFunction<Tuple2<String, Integer>>,
-        ListCheckpointed<Tuple2<String, Integer>>,
-        CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private transient ListState<Tuple2<String, Integer>> checkpointedState;
-
-    private List<Tuple2<String, Integer>> bufferedElements;
-
-    public BufferingSinkListCheckpointed(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        this.bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public List<Tuple2<String, Integer>> snapshotState(
-            long checkpointId, long timestamp) throws Exception {
-        return this.bufferedElements;
-    }
-
-    @Override
-    public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
-        if (!state.isEmpty()) {
-            this.bufferedElements.addAll(state);
-        }
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-        // this is from the CheckpointedRestoring interface.
-        this.bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards
-compatibility reasons and more details will be explained at the end of this section.
-
-##### CheckpointedFunction
-
-The `CheckpointedFunction` interface requires again the implementation of two methods:
-
-{% highlight java %}
-void snapshotState(FunctionSnapshotContext context) throws Exception;
-
-void initializeState(FunctionInitializationContext context) throws Exception;
-{% endhighlight %}
-
-As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is
-the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only
-in the case that we are recovering from a failure. Given this, `initializeState()` is not only the place where different
-types of state are initialized, but also where state recovery logic is included. An implementation of the
-`CheckpointedFunction` interface for `BufferingSink` is presented below.
-
-{% highlight java %}
-public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-        CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
-
-    private final int threshold;
-
-    private transient ListState<Tuple2<String, Integer>> checkpointedState;
-
-    private List<Tuple2<String, Integer>> bufferedElements;
-
-    public BufferingSink(int threshold) {
-        this.threshold = threshold;
-        this.bufferedElements = new ArrayList<>();
-    }
-
-    @Override
-    public void invoke(Tuple2<String, Integer> value) throws Exception {
-        bufferedElements.add(value);
-        if (bufferedElements.size() == threshold) {
-            for (Tuple2<String, Integer> element: bufferedElements) {
-                // send it to the sink
-            }
-            bufferedElements.clear();
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws Exception {
-        checkpointedState.clear();
-        for (Tuple2<String, Integer> element : bufferedElements) {
-            checkpointedState.add(element);
-        }
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws Exception {
-        checkpointedState = context.getOperatorStateStore().
-            getSerializableListState("buffered-elements");
-
-        if (context.isRestored()) {
-            for (Tuple2<String, Integer> element : checkpointedState.get()) {
-                bufferedElements.add(element);
-            }
-        }
-    }
-
-    @Override
-    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-        // this is from the CheckpointedRestoring interface.
-        this.bufferedElements.addAll(state);
-    }
-}
-{% endhighlight %}
-
-The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize
-the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects
-are going to be stored upon checkpointing:
-
-`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");`
-
-After initializing the container, we use the `isRestored()` method of the context to check if we are
-recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied.
-
-As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state
-initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared
-of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.
-
-As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done
-using the `FunctionInitializationContext` given as argument, instead of the `RuntimeContext`, which is the case
-for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the `CountMapper` example,
-the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods
-would look like this:
-
-{% highlight java %}
-public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
-        implements CheckpointedFunction {
-
-    private transient ValueState<Integer> counter;
-
-    private final int numberElements;
-
-    public CountMapper(int numberElements) {
-        this.numberElements = numberElements;
-    }
-
-    @Override
-    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
-        int count = counter.value() + 1;
-        counter.update(count);
-
-        if (count % numberElements == 0) {
-            out.collect(Tuple2.of(value.f0, count));
-            counter.update(0); // reset to 0
-        }
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext context) throws Exception {
-        // all managed, nothing to do.
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext context) throws Exception {
-        counter = context.getKeyedStateStore().getState(
-            new ValueStateDescriptor<>("counter", Integer.class, 0));
-    }
-}
-{% endhighlight %}
-
-Notice that the `snapshotState()` method is empty as Flink itself takes care of snapshotting managed keyed state
-upon checkpointing.
-
-#### Backwards compatibility with Flink 1.1
-
-So far we have seen how to modify our functions to take advantage of the new features introduced by Flink 1.2.
-The question that remains is "Can I make sure that my modified (Flink 1.2) job will start from where my already
-running job from Flink 1.1 stopped?".
-
-The answer is yes, and the way to do it is pretty straightforward. For the keyed state, you have to do nothing.
-Flink will take care of restoring the state from Flink 1.1. For the non-keyed state, your new function has to
-implement the `CheckpointedRestoring` interface, as shown in the code above. This has a single method, the
-familiar `restoreState()` from the old `Checkpointed` interface from Flink 1.1. As shown in the modified code of
-the `BufferingSink`, the `restoreState()` method is identical to its predecessor.
-
-### Aligned Processing Time Window Operators
-
-In Flink 1.1, and only when operating on *processing time* with no specified evictor or trigger,
-the command `timeWindow()` on a keyed stream would instantiate a special type of `WindowOperator`. This could be
-either an `AggregatingProcessingTimeWindowOperator` or an `AccumulatingProcessingTimeWindowOperator`. Both of
-these operators are referred to as *aligned* window operators as they assume their input elements arrive in
-order. This is valid when operating in processing time, as elements get as timestamp the wall-clock time at
-the moment they arrive at the window operator. These operators were restricted to using the memory state backend, and
-had optimized data structures for storing the per-window elements which leveraged the in-order input element arrival.
-
-In Flink 1.2, the aligned window operators are deprecated, and all windowing operations go through the generic
-`WindowOperator`. This migration requires no change in the code of your Flink 1.1 job, as Flink will transparently
-read the state stored by the aligned window operators in your Flink 1.1 savepoint, translate it into a format
-that is compatible with the generic `WindowOperator`, and resume execution using the generic `WindowOperator`.
-
-<span class="label label-info">Note</span> Although deprecated, you can still use the aligned window operators
-in Flink 1.2 through special `WindowAssigners` introduced for exactly this purpose. These assigners are the
-`SlidingAlignedProcessingTimeWindows` and the `TumblingAlignedProcessingTimeWindows` assigners, for sliding and tumbling
-windows respectively. A Flink 1.2 job that uses aligned windowing has to be a new job, as there is no way to
-resume execution from a Flink 1.1 savepoint while using these operators.
-
-<span class="label label-danger">Attention</span> The aligned window operators provide **no rescaling** capabilities
-and **no backwards compatibility** with Flink 1.1.
-
-The code to use the aligned window operators in Flink 1.2 is presented below:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-
-// for tumbling windows
-DataStream<Tuple2<String, Integer>> window1 = source
-    .keyBy(0)
-    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
-    .apply(your-function)
-
-// for sliding windows
-DataStream<Tuple2<String, Integer>> window1 = source
-    .keyBy(0)
-    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
-    .apply(your-function)
-
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-// for tumbling windows
-val window1 = source
-    .keyBy(0)
-    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
-    .apply(your-function)
-
-// for sliding windows
-val window2 = source
-    .keyBy(0)
-    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
-    .apply(your-function)
-
-{% endhighlight %}
-</div>
-</div>
-
 {% top %}