You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/05/10 18:59:23 UTC
[2/2] flink git commit: [FLINK-6512] [docs] improved code formatting
in some examples
[FLINK-6512] [docs] improved code formatting in some examples
This closes #3857
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fbd08b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fbd08b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fbd08b5
Branch: refs/heads/release-1.2
Commit: 9fbd08b58f4ead2dddcc283f99385fa5be94eecf
Parents: 048c0a3
Author: David Anderson <da...@alpinegizmo.com>
Authored: Tue May 9 17:23:46 2017 +0200
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed May 10 14:38:48 2017 -0400
----------------------------------------------------------------------
docs/dev/migration.md | 300 +++++++++++++++++----------------
docs/monitoring/best_practices.md | 30 ++--
2 files changed, 171 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9fbd08b5/docs/dev/migration.md
----------------------------------------------------------------------
diff --git a/docs/dev/migration.md b/docs/dev/migration.md
index a5910a8..11eb42c 100644
--- a/docs/dev/migration.md
+++ b/docs/dev/migration.md
@@ -51,69 +51,70 @@ As running examples for the remainder of this document we will use the `CountMap
functions. The first is an example of a function with **keyed** state, while
the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below:
- public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+{% highlight java %}
+public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
- private transient ValueState<Integer> counter;
+ private transient ValueState<Integer> counter;
- private final int numberElements;
+ private final int numberElements;
- public CountMapper(int numberElements) {
- this.numberElements = numberElements;
- }
+ public CountMapper(int numberElements) {
+ this.numberElements = numberElements;
+ }
- @Override
- public void open(Configuration parameters) throws Exception {
- counter = getRuntimeContext().getState(
- new ValueStateDescriptor<>("counter", Integer.class, 0));
- }
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ counter = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("counter", Integer.class, 0));
+ }
- @Override
- public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
- int count = counter.value() + 1;
- counter.update(count);
+ @Override
+ public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
+ int count = counter.value() + 1;
+ counter.update(count);
- if (count % numberElements == 0) {
- out.collect(Tuple2.of(value.f0, count));
- counter.update(0); // reset to 0
- }
+ if (count % numberElements == 0) {
+ out.collect(Tuple2.of(value.f0, count));
+ counter.update(0); // reset to 0
}
}
+}
+public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
+ Checkpointed<ArrayList<Tuple2<String, Integer>>> {
- public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
- Checkpointed<ArrayList<Tuple2<String, Integer>>> {
-
- private final int threshold;
+ private final int threshold;
- private ArrayList<Tuple2<String, Integer>> bufferedElements;
+ private ArrayList<Tuple2<String, Integer>> bufferedElements;
- BufferingSink(int threshold) {
- this.threshold = threshold;
- this.bufferedElements = new ArrayList<>();
- }
+ BufferingSink(int threshold) {
+ this.threshold = threshold;
+ this.bufferedElements = new ArrayList<>();
+ }
- @Override
- public void invoke(Tuple2<String, Integer> value) throws Exception {
- bufferedElements.add(value);
- if (bufferedElements.size() == threshold) {
- for (Tuple2<String, Integer> element: bufferedElements) {
- // send it to the sink
- }
- bufferedElements.clear();
- }
+ @Override
+ public void invoke(Tuple2<String, Integer> value) throws Exception {
+ bufferedElements.add(value);
+ if (bufferedElements.size() == threshold) {
+ for (Tuple2<String, Integer> element: bufferedElements) {
+ // send it to the sink
}
+ bufferedElements.clear();
+ }
+ }
- @Override
- public ArrayList<Tuple2<String, Integer>> snapshotState(
- long checkpointId, long checkpointTimestamp) throws Exception {
- return bufferedElements;
- }
+ @Override
+ public ArrayList<Tuple2<String, Integer>> snapshotState(
+ long checkpointId, long checkpointTimestamp) throws Exception {
+ return bufferedElements;
+ }
- @Override
- public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
- bufferedElements.addAll(state);
- }
+ @Override
+ public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+ bufferedElements.addAll(state);
}
+}
+{% endhighlight %}
The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped-by-key input stream of the form
@@ -160,9 +161,11 @@ the [State documentation]({{ site.baseurl }}/dev/stream/state.html).
The `ListCheckpointed` interface requires the implementation of two methods:
- List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
+{% highlight java %}
+List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
- void restoreState(List<T> state) throws Exception;
+void restoreState(List<T> state) throws Exception;
+{% endhighlight %}
Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference
is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and
@@ -170,53 +173,55 @@ is that now `snapshotState()` should return a list of objects to checkpoint, as
return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink`
is included below:
- public class BufferingSinkListCheckpointed implements
- SinkFunction<Tuple2<String, Integer>>,
- ListCheckpointed<Tuple2<String, Integer>>,
- CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+{% highlight java %}
+public class BufferingSinkListCheckpointed implements
+ SinkFunction<Tuple2<String, Integer>>,
+ ListCheckpointed<Tuple2<String, Integer>>,
+ CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
- private final int threshold;
+ private final int threshold;
- private transient ListState<Tuple2<String, Integer>> checkpointedState;
+ private transient ListState<Tuple2<String, Integer>> checkpointedState;
- private List<Tuple2<String, Integer>> bufferedElements;
+ private List<Tuple2<String, Integer>> bufferedElements;
- public BufferingSinkListCheckpointed(int threshold) {
- this.threshold = threshold;
- this.bufferedElements = new ArrayList<>();
- }
+ public BufferingSinkListCheckpointed(int threshold) {
+ this.threshold = threshold;
+ this.bufferedElements = new ArrayList<>();
+ }
- @Override
- public void invoke(Tuple2<String, Integer> value) throws Exception {
- this.bufferedElements.add(value);
- if (bufferedElements.size() == threshold) {
- for (Tuple2<String, Integer> element: bufferedElements) {
- // send it to the sink
- }
- bufferedElements.clear();
+ @Override
+ public void invoke(Tuple2<String, Integer> value) throws Exception {
+ this.bufferedElements.add(value);
+ if (bufferedElements.size() == threshold) {
+ for (Tuple2<String, Integer> element: bufferedElements) {
+ // send it to the sink
}
+ bufferedElements.clear();
}
+ }
- @Override
- public List<Tuple2<String, Integer>> snapshotState(
- long checkpointId, long timestamp) throws Exception {
- return this.bufferedElements;
- }
-
- @Override
- public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
- if (!state.isEmpty()) {
- this.bufferedElements.addAll(state);
- }
- }
+ @Override
+ public List<Tuple2<String, Integer>> snapshotState(
+ long checkpointId, long timestamp) throws Exception {
+ return this.bufferedElements;
+ }
- @Override
- public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
- // this is from the CheckpointedRestoring interface.
+ @Override
+ public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
+ if (!state.isEmpty()) {
this.bufferedElements.addAll(state);
}
}
+ @Override
+ public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+ // this is from the CheckpointedRestoring interface.
+ this.bufferedElements.addAll(state);
+ }
+}
+{% endhighlight %}
+
As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards
compatibility reasons and more details will be explained at the end of this section.
@@ -224,9 +229,11 @@ compatibility reasons and more details will be explained at the end of this sect
The `CheckpointedFunction` interface requires again the implementation of two methods:
- void snapshotState(FunctionSnapshotContext context) throws Exception;
+{% highlight java %}
+void snapshotState(FunctionSnapshotContext context) throws Exception;
- void initializeState(FunctionInitializationContext context) throws Exception;
+void initializeState(FunctionInitializationContext context) throws Exception;
+{% endhighlight %}
As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is
the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only
@@ -234,57 +241,59 @@ in the case that we are recovering from a failure. Given this, `initializeState(
types of state are initialized, but also where state recovery logic is included. An implementation of the
`CheckpointedFunction` interface for `BufferingSink` is presented below.
- public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
- CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+{% highlight java %}
+public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
+ CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
- private final int threshold;
+ private final int threshold;
- private transient ListState<Tuple2<String, Integer>> checkpointedState;
+ private transient ListState<Tuple2<String, Integer>> checkpointedState;
- private List<Tuple2<String, Integer>> bufferedElements;
+ private List<Tuple2<String, Integer>> bufferedElements;
- public BufferingSink(int threshold) {
- this.threshold = threshold;
- this.bufferedElements = new ArrayList<>();
- }
+ public BufferingSink(int threshold) {
+ this.threshold = threshold;
+ this.bufferedElements = new ArrayList<>();
+ }
- @Override
- public void invoke(Tuple2<String, Integer> value) throws Exception {
- bufferedElements.add(value);
- if (bufferedElements.size() == threshold) {
- for (Tuple2<String, Integer> element: bufferedElements) {
- // send it to the sink
- }
- bufferedElements.clear();
+ @Override
+ public void invoke(Tuple2<String, Integer> value) throws Exception {
+ bufferedElements.add(value);
+ if (bufferedElements.size() == threshold) {
+ for (Tuple2<String, Integer> element: bufferedElements) {
+ // send it to the sink
}
+ bufferedElements.clear();
}
+ }
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- checkpointedState.clear();
- for (Tuple2<String, Integer> element : bufferedElements) {
- checkpointedState.add(element);
- }
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ checkpointedState.clear();
+ for (Tuple2<String, Integer> element : bufferedElements) {
+ checkpointedState.add(element);
}
+ }
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- checkpointedState = context.getOperatorStateStore().
- getSerializableListState("buffered-elements");
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ checkpointedState = context.getOperatorStateStore().
+ getSerializableListState("buffered-elements");
- if (context.isRestored()) {
- for (Tuple2<String, Integer> element : checkpointedState.get()) {
- bufferedElements.add(element);
- }
+ if (context.isRestored()) {
+ for (Tuple2<String, Integer> element : checkpointedState.get()) {
+ bufferedElements.add(element);
}
}
+ }
- @Override
- public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
- // this is from the CheckpointedRestoring interface.
- this.bufferedElements.addAll(state);
- }
+ @Override
+ public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+ // this is from the CheckpointedRestoring interface.
+ this.bufferedElements.addAll(state);
}
+}
+{% endhighlight %}
The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize
the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects
@@ -305,40 +314,41 @@ for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the `Co
the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods
would look like this:
- public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
- implements CheckpointedFunction {
+{% highlight java %}
+public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
+ implements CheckpointedFunction {
- private transient ValueState<Integer> counter;
+ private transient ValueState<Integer> counter;
- private final int numberElements;
+ private final int numberElements;
- public CountMapper(int numberElements) {
- this.numberElements = numberElements;
- }
+ public CountMapper(int numberElements) {
+ this.numberElements = numberElements;
+ }
- @Override
- public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
- int count = counter.value() + 1;
- counter.update(count);
+ @Override
+ public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
+ int count = counter.value() + 1;
+ counter.update(count);
- if (count % numberElements == 0) {
- out.collect(Tuple2.of(value.f0, count));
- counter.update(0); // reset to 0
- }
- }
+ if (count % numberElements == 0) {
+ out.collect(Tuple2.of(value.f0, count));
+ counter.update(0); // reset to 0
}
+ }
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- //all managed, nothing to do.
- }
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ // all managed, nothing to do.
+ }
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- counter = context.getKeyedStateStore().getState(
- new ValueStateDescriptor<>("counter", Integer.class, 0));
- }
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ counter = context.getKeyedStateStore().getState(
+ new ValueStateDescriptor<>("counter", Integer.class, 0));
}
+}
+{% endhighlight %}
Notice that the `snapshotState()` method is empty as Flink itself takes care of snapshotting managed keyed state
upon checkpointing.
http://git-wip-us.apache.org/repos/asf/flink/blob/9fbd08b5/docs/monitoring/best_practices.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/best_practices.md b/docs/monitoring/best_practices.md
index 0bd362e..5a8bcbc 100644
--- a/docs/monitoring/best_practices.md
+++ b/docs/monitoring/best_practices.md
@@ -59,8 +59,8 @@ ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
This allows getting arguments like `--input hdfs:///mydata --elements 42` from the command line.
{% highlight java %}
public static void main(String[] args) {
- ParameterTool parameter = ParameterTool.fromArgs(args);
- // .. regular code ..
+ ParameterTool parameter = ParameterTool.fromArgs(args);
+ // .. regular code ..
{% endhighlight %}
@@ -114,17 +114,18 @@ The example below shows how to pass the parameters as a `Configuration` object t
{% highlight java %}
ParameterTool parameters = ParameterTool.fromArgs(args);
-DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
+DataSet<Tuple2<String, Integer>> counts = text
+ .flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
{% endhighlight %}
In the `Tokenizer`, the object is now accessible in the `open(Configuration conf)` method:
{% highlight java %}
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
- @Override
- public void open(Configuration parameters) throws Exception {
- parameters.getInteger("myInt", -1);
- // .. do
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ parameters.getInteger("myInt", -1);
+ // .. do
{% endhighlight %}
@@ -147,11 +148,12 @@ Access them in any rich user function:
{% highlight java %}
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
- parameters.getRequired("input");
- // .. do more ..
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+ ParameterTool parameters = (ParameterTool)
+ getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ parameters.getRequired("input");
+ // .. do more ..
{% endhighlight %}
@@ -198,8 +200,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyClass implements MapFunction {
- private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
- // ...
+ private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
+ // ...
{% endhighlight %}