You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/05/10 18:36:03 UTC

[2/3] flink git commit: [FLINK-6512] [docs] improved code formatting in some examples

[FLINK-6512] [docs] improved code formatting in some examples

This closes #3857


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8a8d95e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8a8d95e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8a8d95e3

Branch: refs/heads/master
Commit: 8a8d95e31132889ea5cc3423ea50280fbaf47062
Parents: 91f3765
Author: David Anderson <da...@alpinegizmo.com>
Authored: Tue May 9 17:23:46 2017 +0200
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed May 10 13:30:02 2017 -0400

----------------------------------------------------------------------
 docs/dev/best_practices.md |  30 ++--
 docs/dev/migration.md      | 300 +++++++++++++++++++++-------------------
 2 files changed, 171 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8a8d95e3/docs/dev/best_practices.md
----------------------------------------------------------------------
diff --git a/docs/dev/best_practices.md b/docs/dev/best_practices.md
index b2111c4..4dfd7fd 100644
--- a/docs/dev/best_practices.md
+++ b/docs/dev/best_practices.md
@@ -59,8 +59,8 @@ ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
 This allows getting arguments like `--input hdfs:///mydata --elements 42` from the command line.
 {% highlight java %}
 public static void main(String[] args) {
-	ParameterTool parameter = ParameterTool.fromArgs(args);
-	// .. regular code ..
+    ParameterTool parameter = ParameterTool.fromArgs(args);
+    // .. regular code ..
 {% endhighlight %}
 
 
@@ -114,17 +114,18 @@ The example below shows how to pass the parameters as a `Configuration` object t
 
 {% highlight java %}
 ParameterTool parameters = ParameterTool.fromArgs(args);
-DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
+DataSet<Tuple2<String, Integer>> counts = text
+        .flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
 {% endhighlight %}
 
 In the `Tokenizer`, the object is now accessible in the `open(Configuration conf)` method:
 
 {% highlight java %}
 public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		parameters.getInteger("myInt", -1);
-		// .. do
+    @Override
+    public void open(Configuration parameters) throws Exception {
+	parameters.getInteger("myInt", -1);
+	// .. do
 {% endhighlight %}
 
 
@@ -147,11 +148,12 @@ Access them in any rich user function:
 {% highlight java %}
 public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
 
-	@Override
-	public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-		ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
-		parameters.getRequired("input");
-		// .. do more ..
+    @Override
+    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+	ParameterTool parameters = (ParameterTool)
+	    getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+	parameters.getRequired("input");
+	// .. do more ..
 {% endhighlight %}
 
 
@@ -198,8 +200,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MyClass implements MapFunction {
-	private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
-	// ...
+    private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
+    // ...
 {% endhighlight %}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8a8d95e3/docs/dev/migration.md
----------------------------------------------------------------------
diff --git a/docs/dev/migration.md b/docs/dev/migration.md
index a5910a8..11eb42c 100644
--- a/docs/dev/migration.md
+++ b/docs/dev/migration.md
@@ -51,69 +51,70 @@ As running examples for the remainder of this document we will use the `CountMap
 functions. The first is an example of a function with **keyed** state, while
 the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below:
 
-    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+{% highlight java %}
+public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
 
-        private transient ValueState<Integer> counter;
+    private transient ValueState<Integer> counter;
 
-        private final int numberElements;
+    private final int numberElements;
 
-        public CountMapper(int numberElements) {
-            this.numberElements = numberElements;
-        }
+    public CountMapper(int numberElements) {
+        this.numberElements = numberElements;
+    }
 
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            counter = getRuntimeContext().getState(
-      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
-        }
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        counter = getRuntimeContext().getState(
+            new ValueStateDescriptor<>("counter", Integer.class, 0));
+    }
 
-        @Override
-        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
-            int count = counter.value() + 1;
-      	    counter.update(count);
+    @Override
+    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
+        int count = counter.value() + 1;
+        counter.update(count);
 
-      	    if (count % numberElements == 0) {
-     		    out.collect(Tuple2.of(value.f0, count));
-     		    counter.update(0); // reset to 0
-     	    }
+        if (count % numberElements == 0) {
+            out.collect(Tuple2.of(value.f0, count));
+            counter.update(0); // reset to 0
         }
     }
+}
 
+public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
+    Checkpointed<ArrayList<Tuple2<String, Integer>>> {
 
-    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
-
-	    private final int threshold;
+    private final int threshold;
 
-	    private ArrayList<Tuple2<String, Integer>> bufferedElements;
+    private ArrayList<Tuple2<String, Integer>> bufferedElements;
 
-	    BufferingSink(int threshold) {
-		    this.threshold = threshold;
-		    this.bufferedElements = new ArrayList<>();
-	    }
+    BufferingSink(int threshold) {
+        this.threshold = threshold;
+        this.bufferedElements = new ArrayList<>();
+    }
 
-    	@Override
-	    public void invoke(Tuple2<String, Integer> value) throws Exception {
-		    bufferedElements.add(value);
-		    if (bufferedElements.size() == threshold) {
-			    for (Tuple2<String, Integer> element: bufferedElements) {
-				    // send it to the sink
-			    }
-			    bufferedElements.clear();
-		    }
+    @Override
+    public void invoke(Tuple2<String, Integer> value) throws Exception {
+        bufferedElements.add(value);
+        if (bufferedElements.size() == threshold) {
+            for (Tuple2<String, Integer> element: bufferedElements) {
+	        // send it to the sink
 	    }
+	    bufferedElements.clear();
+	}
+    }
 
-	    @Override
-	    public ArrayList<Tuple2<String, Integer>> snapshotState(
-	            long checkpointId, long checkpointTimestamp) throws Exception {
-		    return bufferedElements;
-	    }
+    @Override
+    public ArrayList<Tuple2<String, Integer>> snapshotState(
+        long checkpointId, long checkpointTimestamp) throws Exception {
+	    return bufferedElements;
+    }
 
-	    @Override
-	    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-	    	bufferedElements.addAll(state);
-        }
+    @Override
+    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+        bufferedElements.addAll(state);
     }
+}
+{% endhighlight %}
 
 
 The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped-by-key input stream of the form
@@ -160,9 +161,11 @@ the [State documentation]({{ site.baseurl }}/dev/stream/state.html).
 
 The `ListCheckpointed` interface requires the implementation of two methods:
 
-    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
+{% highlight java %}
+List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
 
-    void restoreState(List<T> state) throws Exception;
+void restoreState(List<T> state) throws Exception;
+{% endhighlight %}
 
 Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference
 is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and
@@ -170,53 +173,55 @@ is that now `snapshotState()` should return a list of objects to checkpoint, as
 return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink`
 is included below:
 
-    public class BufferingSinkListCheckpointed implements
-            SinkFunction<Tuple2<String, Integer>>,
-            ListCheckpointed<Tuple2<String, Integer>>,
-            CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+{% highlight java %}
+public class BufferingSinkListCheckpointed implements
+        SinkFunction<Tuple2<String, Integer>>,
+        ListCheckpointed<Tuple2<String, Integer>>,
+        CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
 
-        private final int threshold;
+    private final int threshold;
 
-        private transient ListState<Tuple2<String, Integer>> checkpointedState;
+    private transient ListState<Tuple2<String, Integer>> checkpointedState;
 
-        private List<Tuple2<String, Integer>> bufferedElements;
+    private List<Tuple2<String, Integer>> bufferedElements;
 
-        public BufferingSinkListCheckpointed(int threshold) {
-            this.threshold = threshold;
-            this.bufferedElements = new ArrayList<>();
-        }
+    public BufferingSinkListCheckpointed(int threshold) {
+        this.threshold = threshold;
+        this.bufferedElements = new ArrayList<>();
+    }
 
-        @Override
-        public void invoke(Tuple2<String, Integer> value) throws Exception {
-            this.bufferedElements.add(value);
-            if (bufferedElements.size() == threshold) {
-                for (Tuple2<String, Integer> element: bufferedElements) {
-                    // send it to the sink
-                }
-                bufferedElements.clear();
+    @Override
+    public void invoke(Tuple2<String, Integer> value) throws Exception {
+        this.bufferedElements.add(value);
+        if (bufferedElements.size() == threshold) {
+            for (Tuple2<String, Integer> element: bufferedElements) {
+                // send it to the sink
             }
+            bufferedElements.clear();
         }
+    }
 
-        @Override
-        public List<Tuple2<String, Integer>> snapshotState(
-                long checkpointId, long timestamp) throws Exception {
-            return this.bufferedElements;
-        }
-
-        @Override
-        public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
-            if (!state.isEmpty()) {
-                this.bufferedElements.addAll(state);
-            }
-        }
+    @Override
+    public List<Tuple2<String, Integer>> snapshotState(
+            long checkpointId, long timestamp) throws Exception {
+        return this.bufferedElements;
+    }
 
-        @Override
-        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-            // this is from the CheckpointedRestoring interface.
+    @Override
+    public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
+        if (!state.isEmpty()) {
             this.bufferedElements.addAll(state);
         }
     }
 
+    @Override
+    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+        // this is from the CheckpointedRestoring interface.
+        this.bufferedElements.addAll(state);
+    }
+}
+{% endhighlight %}
+
 As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards
 compatibility reasons and more details will be explained at the end of this section.
 
@@ -224,9 +229,11 @@ compatibility reasons and more details will be explained at the end of this sect
 
 The `CheckpointedFunction` interface requires again the implementation of two methods:
 
-    void snapshotState(FunctionSnapshotContext context) throws Exception;
+{% highlight java %}
+void snapshotState(FunctionSnapshotContext context) throws Exception;
 
-    void initializeState(FunctionInitializationContext context) throws Exception;
+void initializeState(FunctionInitializationContext context) throws Exception;
+{% endhighlight %}
 
 As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is
 the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only
@@ -234,57 +241,59 @@ in the case that we are recovering from a failure. Given this, `initializeState(
 types of state are initialized, but also where state recovery logic is included. An implementation of the
 `CheckpointedFunction` interface for `BufferingSink` is presented below.
 
-    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-            CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+{% highlight java %}
+public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
+        CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
 
-        private final int threshold;
+    private final int threshold;
 
-        private transient ListState<Tuple2<String, Integer>> checkpointedState;
+    private transient ListState<Tuple2<String, Integer>> checkpointedState;
 
-        private List<Tuple2<String, Integer>> bufferedElements;
+    private List<Tuple2<String, Integer>> bufferedElements;
 
-        public BufferingSink(int threshold) {
-            this.threshold = threshold;
-            this.bufferedElements = new ArrayList<>();
-        }
+    public BufferingSink(int threshold) {
+        this.threshold = threshold;
+        this.bufferedElements = new ArrayList<>();
+    }
 
-        @Override
-        public void invoke(Tuple2<String, Integer> value) throws Exception {
-            bufferedElements.add(value);
-            if (bufferedElements.size() == threshold) {
-                for (Tuple2<String, Integer> element: bufferedElements) {
-                    // send it to the sink
-                }
-                bufferedElements.clear();
+    @Override
+    public void invoke(Tuple2<String, Integer> value) throws Exception {
+        bufferedElements.add(value);
+        if (bufferedElements.size() == threshold) {
+            for (Tuple2<String, Integer> element: bufferedElements) {
+                // send it to the sink
             }
+            bufferedElements.clear();
         }
+    }
 
-        @Override
-        public void snapshotState(FunctionSnapshotContext context) throws Exception {
-            checkpointedState.clear();
-            for (Tuple2<String, Integer> element : bufferedElements) {
-                checkpointedState.add(element);
-            }
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        checkpointedState.clear();
+        for (Tuple2<String, Integer> element : bufferedElements) {
+            checkpointedState.add(element);
         }
+    }
 
-        @Override
-        public void initializeState(FunctionInitializationContext context) throws Exception {
-            checkpointedState = context.getOperatorStateStore().
-                getSerializableListState("buffered-elements");
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        checkpointedState = context.getOperatorStateStore().
+            getSerializableListState("buffered-elements");
 
-            if (context.isRestored()) {
-                for (Tuple2<String, Integer> element : checkpointedState.get()) {
-                    bufferedElements.add(element);
-                }
+        if (context.isRestored()) {
+            for (Tuple2<String, Integer> element : checkpointedState.get()) {
+                bufferedElements.add(element);
             }
         }
+    }
 
-        @Override
-        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-            // this is from the CheckpointedRestoring interface.
-            this.bufferedElements.addAll(state);
-        }
+    @Override
+    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+        // this is from the CheckpointedRestoring interface.
+        this.bufferedElements.addAll(state);
     }
+}
+{% endhighlight %}
 
 The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize
 the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects
@@ -305,40 +314,41 @@ for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the `Co
 the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods
 would look like this:
 
-    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
-            implements CheckpointedFunction {
+{% highlight java %}
+public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
+        implements CheckpointedFunction {
 
-        private transient ValueState<Integer> counter;
+    private transient ValueState<Integer> counter;
 
-        private final int numberElements;
+    private final int numberElements;
 
-        public CountMapper(int numberElements) {
-            this.numberElements = numberElements;
-        }
+    public CountMapper(int numberElements) {
+        this.numberElements = numberElements;
+    }
 
-        @Override
-        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
-            int count = counter.value() + 1;
-            counter.update(count);
+    @Override
+    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
+        int count = counter.value() + 1;
+        counter.update(count);
 
-            if (count % numberElements == 0) {
-                out.collect(Tuple2.of(value.f0, count));
-             	counter.update(0); // reset to 0
-             	}
-            }
+        if (count % numberElements == 0) {
+            out.collect(Tuple2.of(value.f0, count));
+            counter.update(0); // reset to 0
         }
+    }
 
-        @Override
-        public void snapshotState(FunctionSnapshotContext context) throws Exception {
-            //all managed, nothing to do.
-        }
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        // all managed, nothing to do.
+    }
 
-        @Override
-        public void initializeState(FunctionInitializationContext context) throws Exception {
-            counter = context.getKeyedStateStore().getState(
-                new ValueStateDescriptor<>("counter", Integer.class, 0));
-        }
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        counter = context.getKeyedStateStore().getState(
+            new ValueStateDescriptor<>("counter", Integer.class, 0));
     }
+}
+{% endhighlight %}
 
 Notice that the `snapshotState()` method is empty as Flink itself takes care of snapshotting managed keyed state
 upon checkpointing.