You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/02/06 16:10:46 UTC
[2/2] flink git commit: [docs][state] add missing Java syntax
highlighting to documentation
[docs][state] add missing Java syntax highlighting to documentation
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4fe654e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4fe654e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4fe654e
Branch: refs/heads/release-1.2
Commit: f4fe654e7e1457a0f9f39f128545438baf46ffbc
Parents: 39e6049
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Feb 6 11:03:45 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Feb 6 17:10:38 2017 +0100
----------------------------------------------------------------------
docs/dev/stream/state.md | 88 +++++++++++++++++++++++--------------------
1 file changed, 48 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f4fe654e/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index 124ce68..4d1cfab 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -230,9 +230,11 @@ while `(test2, 2)` will go to task 1.
The `ListCheckpointed` interface requires the implementation of two methods:
- List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
+{% highlight java %}
+List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
- void restoreState(List<T> state) throws Exception;
+void restoreState(List<T> state) throws Exception;
+{% endhighlight %}
On `snapshotState()` the operator should return a list of objects to checkpoint and
`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always
@@ -242,9 +244,11 @@ return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
The `CheckpointedFunction` interface also requires the implementation of two methods:
- void snapshotState(FunctionSnapshotContext context) throws Exception;
+{% highlight java %}
+void snapshotState(FunctionSnapshotContext context) throws Exception;
- void initializeState(FunctionInitializationContext context) throws Exception;
+void initializeState(FunctionInitializationContext context) throws Exception;
+{% endhighlight %}
Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized
or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not
@@ -253,57 +257,61 @@ only the place where different types of state are initialized, but also where st
This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that
uses state to buffer elements before sending them to the outside world:
- public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
- CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+{% highlight java %}
+public class BufferingSink
+ implements SinkFunction<Tuple2<String, Integer>>,
+ CheckpointedFunction,
+ CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
- private final int threshold;
+ private final int threshold;
- private transient ListState<Tuple2<String, Integer>> checkpointedState;
+ private transient ListState<Tuple2<String, Integer>> checkpointedState;
- private List<Tuple2<String, Integer>> bufferedElements;
+ private List<Tuple2<String, Integer>> bufferedElements;
- public BufferingSink(int threshold) {
- this.threshold = threshold;
- this.bufferedElements = new ArrayList<>();
- }
+ public BufferingSink(int threshold) {
+ this.threshold = threshold;
+ this.bufferedElements = new ArrayList<>();
+ }
- @Override
- public void invoke(Tuple2<String, Integer> value) throws Exception {
- bufferedElements.add(value);
- if (bufferedElements.size() == threshold) {
- for (Tuple2<String, Integer> element: bufferedElements) {
- // send it to the sink
- }
- bufferedElements.clear();
+ @Override
+ public void invoke(Tuple2<String, Integer> value) throws Exception {
+ bufferedElements.add(value);
+ if (bufferedElements.size() == threshold) {
+ for (Tuple2<String, Integer> element: bufferedElements) {
+ // send it to the sink
}
+ bufferedElements.clear();
}
+ }
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- checkpointedState.clear();
- for (Tuple2<String, Integer> element : bufferedElements) {
- checkpointedState.add(element);
- }
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ checkpointedState.clear();
+ for (Tuple2<String, Integer> element : bufferedElements) {
+ checkpointedState.add(element);
}
+ }
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- checkpointedState = context.getOperatorStateStore().
- getSerializableListState("buffered-elements");
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ checkpointedState = context.getOperatorStateStore().
+ getSerializableListState("buffered-elements");
- if (context.isRestored()) {
- for (Tuple2<String, Integer> element : checkpointedState.get()) {
- bufferedElements.add(element);
- }
+ if (context.isRestored()) {
+ for (Tuple2<String, Integer> element : checkpointedState.get()) {
+ bufferedElements.add(element);
}
}
+ }
- @Override
- public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
- // this is from the CheckpointedRestoring interface.
- this.bufferedElements.addAll(state);
- }
+ @Override
+ public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+ // this is from the CheckpointedRestoring interface.
+ this.bufferedElements.addAll(state);
}
+}
+{% endhighlight %}
The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize