You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/02 15:35:54 UTC

flink git commit: [FLINK-5502] [docs] Add migration guide in docs.

Repository: flink
Updated Branches:
  refs/heads/master 7d2c25ddf -> 35460d24f


[FLINK-5502] [docs] Add migration guide in docs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35460d24
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35460d24
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35460d24

Branch: refs/heads/master
Commit: 35460d24fcd1c736eb60033840aa5303c710ff6e
Parents: 7d2c25d
Author: kl0u <kk...@gmail.com>
Authored: Fri Jan 13 15:52:04 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Feb 2 16:34:48 2017 +0100

----------------------------------------------------------------------
 docs/dev/migration.md | 393 ++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 390 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35460d24/docs/dev/migration.md
----------------------------------------------------------------------
diff --git a/docs/dev/migration.md b/docs/dev/migration.md
index c74952c..a5910a8 100644
--- a/docs/dev/migration.md
+++ b/docs/dev/migration.md
@@ -25,9 +25,396 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-## Flink 1.1 to 1.2
+## Migrating from Flink 1.1 to Flink 1.2
 
-### State API
+As mentioned in the [State documentation]({{ site.baseurl }}/dev/stream/state.html), Flink has two types of state:
+**keyed** and **non-keyed** state (also called **operator** state). Both types are available to
+both operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1
+function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern the
+deprecation of the aligned window operators from Flink 1.1 (see [Aligned Processing Time Window Operators](#aligned-processing-time-window-operators)).
 
-### Fast Processing Time Window Operators
+The migration process will serve two goals:
 
+1. allow your functions to take advantage of the new features introduced in Flink 1.2, such as rescaling,
+
+2. make sure that your new Flink 1.2 job will be able to resume execution from a savepoint generated by its
+Flink 1.1 predecessor.
+
+After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2
+simply by taking a [savepoint]({{ site.baseurl }}/setup/savepoints.html) with your Flink 1.1 job and giving it to
+your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where its
+Flink 1.1 predecessor left off.
+
+### Example User Functions
+
+As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
+functions. The first is an example of a function with **keyed** state, while
+the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below:
+
+    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+
+        private transient ValueState<Integer> counter;
+
+        private final int numberElements;
+
+        public CountMapper(int numberElements) {
+            this.numberElements = numberElements;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            counter = getRuntimeContext().getState(
+      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
+        }
+
+        @Override
+        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
+            int count = counter.value() + 1;
+      	    counter.update(count);
+
+      	    if (count % numberElements == 0) {
+     		    out.collect(Tuple2.of(value.f0, count));
+     		    counter.update(0); // reset to 0
+     	    }
+        }
+    }
+
+
+    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
+            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
+
+	    private final int threshold;
+
+	    private ArrayList<Tuple2<String, Integer>> bufferedElements;
+
+	    BufferingSink(int threshold) {
+		    this.threshold = threshold;
+		    this.bufferedElements = new ArrayList<>();
+	    }
+
+    	@Override
+	    public void invoke(Tuple2<String, Integer> value) throws Exception {
+		    bufferedElements.add(value);
+		    if (bufferedElements.size() == threshold) {
+			    for (Tuple2<String, Integer> element: bufferedElements) {
+				    // send it to the sink
+			    }
+			    bufferedElements.clear();
+		    }
+	    }
+
+	    @Override
+	    public ArrayList<Tuple2<String, Integer>> snapshotState(
+	            long checkpointId, long checkpointTimestamp) throws Exception {
+		    return bufferedElements;
+	    }
+
+	    @Override
+	    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+	    	bufferedElements.addAll(state);
+        }
+    }
+
+
+The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped-by-key input stream of the form
+`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if
+the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted
+containing the word itself and the number of occurrences.
+
+The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`)
+and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink.
+This is a common way to avoid many expensive calls to a database or an external storage system. To do the
+buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is
+periodically checkpointed.
+
+### State API Migration
+
+To leverage the new features of Flink 1.2, the code above should be modified to use the new state abstractions.
+After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you
+are guaranteed that the new version of your job will start from where its predecessor left off.
+
+**Keyed State:** Something to note before delving into the details of the migration process is that if your function
+has **only keyed state**, then the exact same code from Flink 1.1 also works for Flink 1.2 with full support
+for the new features and full backwards compatibility. Changes could be made just for better code organization,
+but this is just a matter of style.
+
+With the above said, the rest of this section focuses on the **non-keyed state**.
+
+#### Rescaling and new state abstractions
+
+The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface
+to the new ones. In Flink 1.2, a stateful function can implement either the more general `CheckpointedFunction`
+interface, or the `ListCheckpointed<T extends Serializable>` interface, which is semantically closer to the old
+`Checkpointed` one.
+
+In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other,
+thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
+non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
+contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
+while `(test2, 2)` will go to task 1.
+
+More details on the principles behind rescaling of both keyed state and non-keyed state can be found in
+the [State documentation]({{ site.baseurl }}/dev/stream/state.html).
+
+##### ListCheckpointed
+
+The `ListCheckpointed` interface requires the implementation of two methods:
+
+    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
+
+    void restoreState(List<T> state) throws Exception;
+
+Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference
+is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and
+`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always
+return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink`
+is included below:
+
+    public class BufferingSinkListCheckpointed implements
+            SinkFunction<Tuple2<String, Integer>>,
+            ListCheckpointed<Tuple2<String, Integer>>,
+            CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+
+        private final int threshold;
+
+        private transient ListState<Tuple2<String, Integer>> checkpointedState;
+
+        private List<Tuple2<String, Integer>> bufferedElements;
+
+        public BufferingSinkListCheckpointed(int threshold) {
+            this.threshold = threshold;
+            this.bufferedElements = new ArrayList<>();
+        }
+
+        @Override
+        public void invoke(Tuple2<String, Integer> value) throws Exception {
+            this.bufferedElements.add(value);
+            if (bufferedElements.size() == threshold) {
+                for (Tuple2<String, Integer> element: bufferedElements) {
+                    // send it to the sink
+                }
+                bufferedElements.clear();
+            }
+        }
+
+        @Override
+        public List<Tuple2<String, Integer>> snapshotState(
+                long checkpointId, long timestamp) throws Exception {
+            return this.bufferedElements;
+        }
+
+        @Override
+        public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
+            if (!state.isEmpty()) {
+                this.bufferedElements.addAll(state);
+            }
+        }
+
+        @Override
+        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+            // this is from the CheckpointedRestoring interface.
+            this.bufferedElements.addAll(state);
+        }
+    }
+
+As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards
+compatibility reasons and more details will be explained at the end of this section.
+
+##### CheckpointedFunction
+
+The `CheckpointedFunction` interface requires again the implementation of two methods:
+
+    void snapshotState(FunctionSnapshotContext context) throws Exception;
+
+    void initializeState(FunctionInitializationContext context) throws Exception;
+
+As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is
+the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only
+in the case that we are recovering from a failure. Given this, `initializeState()` is not only the place where different
+types of state are initialized, but also where state recovery logic is included. An implementation of the
+`CheckpointedFunction` interface for `BufferingSink` is presented below.
+
+    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
+            CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+
+        private final int threshold;
+
+        private transient ListState<Tuple2<String, Integer>> checkpointedState;
+
+        private List<Tuple2<String, Integer>> bufferedElements;
+
+        public BufferingSink(int threshold) {
+            this.threshold = threshold;
+            this.bufferedElements = new ArrayList<>();
+        }
+
+        @Override
+        public void invoke(Tuple2<String, Integer> value) throws Exception {
+            bufferedElements.add(value);
+            if (bufferedElements.size() == threshold) {
+                for (Tuple2<String, Integer> element: bufferedElements) {
+                    // send it to the sink
+                }
+                bufferedElements.clear();
+            }
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            checkpointedState.clear();
+            for (Tuple2<String, Integer> element : bufferedElements) {
+                checkpointedState.add(element);
+            }
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            checkpointedState = context.getOperatorStateStore().
+                getSerializableListState("buffered-elements");
+
+            if (context.isRestored()) {
+                for (Tuple2<String, Integer> element : checkpointedState.get()) {
+                    bufferedElements.add(element);
+                }
+            }
+        }
+
+        @Override
+        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+            // this is from the CheckpointedRestoring interface.
+            this.bufferedElements.addAll(state);
+        }
+    }
+
+The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize
+the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects
+are going to be stored upon checkpointing:
+
+`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");`
+
+After initializing the container, we use the `isRestored()` method of the context to check if we are
+recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied.
+
+As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state
+initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared
+of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.
+
+As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done
+using the `FunctionInitializationContext` given as argument, instead of the `RuntimeContext`, which is the case
+for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the `CountMapper` example,
+the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods
+would look like this:
+
+    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
+            implements CheckpointedFunction {
+
+        private transient ValueState<Integer> counter;
+
+        private final int numberElements;
+
+        public CountMapper(int numberElements) {
+            this.numberElements = numberElements;
+        }
+
+        @Override
+        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
+            int count = counter.value() + 1;
+            counter.update(count);
+
+            if (count % numberElements == 0) {
+                out.collect(Tuple2.of(value.f0, count));
+             	counter.update(0); // reset to 0
+             	}
+            }
+        }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) throws Exception {
+            //all managed, nothing to do.
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) throws Exception {
+            counter = context.getKeyedStateStore().getState(
+                new ValueStateDescriptor<>("counter", Integer.class, 0));
+        }
+    }
+
+Notice that the `snapshotState()` method is empty as Flink itself takes care of snapshotting managed keyed state
+upon checkpointing.
+
+#### Backwards compatibility with Flink 1.1
+
+So far we have seen how to modify our functions to take advantage of the new features introduced by Flink 1.2.
+The question that remains is "Can I make sure that my modified (Flink 1.2) job will start from where my already
+running job from Flink 1.1 stopped?".
+
+The answer is yes, and the way to do it is pretty straightforward. For the keyed state, you have to do nothing.
+Flink will take care of restoring the state from Flink 1.1. For the non-keyed state, your new function has to
+implement the `CheckpointedRestoring` interface, as shown in the code above. This has a single method, the
+familiar `restoreState()` from the old `Checkpointed` interface from Flink 1.1. As shown in the modified code of
+the `BufferingSink`, the `restoreState()` method is identical to its predecessor.
+
+### Aligned Processing Time Window Operators
+
+In Flink 1.1, and only when operating on *processing time* with no specified evictor or trigger,
+the command `timeWindow()` on a keyed stream would instantiate a special type of `WindowOperator`. This could be
+either an `AggregatingProcessingTimeWindowOperator` or an `AccumulatingProcessingTimeWindowOperator`. Both of
+these operators are referred to as *aligned* window operators as they assume their input elements arrive in
+order. This is valid when operating in processing time, as elements get as timestamp the wall-clock time at
+the moment they arrive at the window operator. These operators were restricted to using the memory state backend, and
+had optimized data structures for storing the per-window elements which leveraged the in-order input element arrival.
+
+In Flink 1.2, the aligned window operators are deprecated, and all windowing operations go through the generic
+`WindowOperator`. This migration requires no change in the code of your Flink 1.1 job, as Flink will transparently
+read the state stored by the aligned window operators in your Flink 1.1 savepoint, translate it into a format
+that is compatible with the generic `WindowOperator`, and resume execution using the generic `WindowOperator`.
+
+<span class="label label-info">Note</span> Although deprecated, you can still use the aligned window operators
+in Flink 1.2 through special `WindowAssigners` introduced for exactly this purpose. These assigners are the
+`SlidingAlignedProcessingTimeWindows` and the `TumblingAlignedProcessingTimeWindows` assigners, for sliding and tumbling
+windows respectively. A Flink 1.2 job that uses aligned windowing has to be a new job, as there is no way to
+resume execution from a Flink 1.1 savepoint while using these operators.
+
+<span class="label label-danger">Attention</span> The aligned window operators provide **no rescaling** capabilities
+and **no backwards compatibility** with Flink 1.1.
+
+The code to use the aligned window operators in Flink 1.2 is presented below:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+// for tumbling windows
+DataStream<Tuple2<String, Integer>> window1 = source
+	.keyBy(0)
+	.window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
+	.apply(your-function)
+
+// for sliding windows
+DataStream<Tuple2<String, Integer>> window1 = source
+	.keyBy(0)
+	.window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+	.apply(your-function)
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+// for tumbling windows
+val window1 = source
+    .keyBy(0)
+    .window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
+    .apply(your-function)
+
+// for sliding windows
+val window2 = source
+    .keyBy(0)
+    .window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+    .apply(your-function)
+
+{% endhighlight %}
+</div>
+</div>