You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/02/06 16:10:46 UTC

[2/2] flink git commit: [docs][state] add missing Java syntax highlighting to documentation

[docs][state] add missing Java syntax highlighting to documentation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4fe654e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4fe654e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4fe654e

Branch: refs/heads/release-1.2
Commit: f4fe654e7e1457a0f9f39f128545438baf46ffbc
Parents: 39e6049
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Feb 6 11:03:45 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Feb 6 17:10:38 2017 +0100

----------------------------------------------------------------------
 docs/dev/stream/state.md | 88 +++++++++++++++++++++++--------------------
 1 file changed, 48 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f4fe654e/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index 124ce68..4d1cfab 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -230,9 +230,11 @@ while `(test2, 2)` will go to task 1.
 
 The `ListCheckpointed` interface requires the implementation of two methods:
 
-    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
+{% highlight java %}
+List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
 
-    void restoreState(List<T> state) throws Exception;
+void restoreState(List<T> state) throws Exception;
+{% endhighlight %}
 
 On `snapshotState()` the operator should return a list of objects to checkpoint and
 `restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always
@@ -242,9 +244,11 @@ return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
 
 The `CheckpointedFunction` interface also requires the implementation of two methods:
 
-    void snapshotState(FunctionSnapshotContext context) throws Exception;
+{% highlight java %}
+void snapshotState(FunctionSnapshotContext context) throws Exception;
 
-    void initializeState(FunctionInitializationContext context) throws Exception;
+void initializeState(FunctionInitializationContext context) throws Exception;
+{% endhighlight %}
 
 Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized
 or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not
@@ -253,57 +257,61 @@ only the place where different types of state are initialized, but also where st
 This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that
 uses state to buffer elements before sending them to the outside world:
 
-    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-            CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+{% highlight java %}
+public class BufferingSink
+        implements SinkFunction<Tuple2<String, Integer>>,
+                   CheckpointedFunction,
+                   CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
 
-        private final int threshold;
+    private final int threshold;
 
-        private transient ListState<Tuple2<String, Integer>> checkpointedState;
+    private transient ListState<Tuple2<String, Integer>> checkpointedState;
 
-        private List<Tuple2<String, Integer>> bufferedElements;
+    private List<Tuple2<String, Integer>> bufferedElements;
 
-        public BufferingSink(int threshold) {
-            this.threshold = threshold;
-            this.bufferedElements = new ArrayList<>();
-        }
+    public BufferingSink(int threshold) {
+        this.threshold = threshold;
+        this.bufferedElements = new ArrayList<>();
+    }
 
-        @Override
-        public void invoke(Tuple2<String, Integer> value) throws Exception {
-            bufferedElements.add(value);
-            if (bufferedElements.size() == threshold) {
-                for (Tuple2<String, Integer> element: bufferedElements) {
-                    // send it to the sink
-                }
-                bufferedElements.clear();
+    @Override
+    public void invoke(Tuple2<String, Integer> value) throws Exception {
+        bufferedElements.add(value);
+        if (bufferedElements.size() == threshold) {
+            for (Tuple2<String, Integer> element: bufferedElements) {
+                // send it to the sink
             }
+            bufferedElements.clear();
         }
+    }
 
-        @Override
-        public void snapshotState(FunctionSnapshotContext context) throws Exception {
-            checkpointedState.clear();
-            for (Tuple2<String, Integer> element : bufferedElements) {
-                checkpointedState.add(element);
-            }
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        checkpointedState.clear();
+        for (Tuple2<String, Integer> element : bufferedElements) {
+            checkpointedState.add(element);
         }
+    }
 
-        @Override
-        public void initializeState(FunctionInitializationContext context) throws Exception {
-            checkpointedState = context.getOperatorStateStore().
-                getSerializableListState("buffered-elements");
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        checkpointedState = context.getOperatorStateStore().
+            getSerializableListState("buffered-elements");
 
-            if (context.isRestored()) {
-                for (Tuple2<String, Integer> element : checkpointedState.get()) {
-                    bufferedElements.add(element);
-                }
+        if (context.isRestored()) {
+            for (Tuple2<String, Integer> element : checkpointedState.get()) {
+                bufferedElements.add(element);
             }
         }
+    }
 
-        @Override
-        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-            // this is from the CheckpointedRestoring interface.
-            this.bufferedElements.addAll(state);
-        }
+    @Override
+    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+        // this is from the CheckpointedRestoring interface.
+        this.bufferedElements.addAll(state);
     }
+}
+{% endhighlight %}
 
 
 The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize