You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/05/10 18:59:22 UTC
[1/2] flink git commit: [FLINK-6330] [docs] Add basic Docker, K8s docs
Repository: flink
Updated Branches:
refs/heads/release-1.2 a820f662d -> 9fbd08b58
[FLINK-6330] [docs] Add basic Docker, K8s docs
This closes #3751
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/048c0a3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/048c0a3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/048c0a3e
Branch: refs/heads/release-1.2
Commit: 048c0a3e7390e9a8c991b32a8bdaf2648a4b564d
Parents: a820f66
Author: Patrick Lucas <me...@patricklucas.com>
Authored: Fri Apr 21 15:00:53 2017 +0200
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed May 10 14:38:47 2017 -0400
----------------------------------------------------------------------
docs/docker/run.sh | 4 +-
docs/setup/docker.md | 99 ++++++++++++++++++++++++++
docs/setup/kubernetes.md | 157 ++++++++++++++++++++++++++++++++++++++++++
3 files changed, 259 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/048c0a3e/docs/docker/run.sh
----------------------------------------------------------------------
diff --git a/docs/docker/run.sh b/docs/docker/run.sh
index 3c8878a..5598c0a 100755
--- a/docs/docker/run.sh
+++ b/docs/docker/run.sh
@@ -31,10 +31,12 @@ if [ "$(uname -s)" == "Linux" ]; then
USER_NAME=${SUDO_USER:=$USER}
USER_ID=$(id -u "${USER_NAME}")
GROUP_ID=$(id -g "${USER_NAME}")
+ LOCAL_HOME="/home/${USER_NAME}"
else # boot2docker uid and gid
USER_NAME=$USER
USER_ID=1000
GROUP_ID=50
+ LOCAL_HOME="/Users/${USER_NAME}"
fi
docker build -t "${IMAGE_NAME}-${USER_NAME}" - <<UserSpecificDocker
@@ -65,7 +67,7 @@ docker run -i -t \
-w ${FLINK_DOC_ROOT} \
-u "${USER}" \
-v "${FLINK_DOC_ROOT}:${FLINK_DOC_ROOT}" \
- -v "/home/${USER_NAME}:/home/${USER_NAME}" \
+ -v "${LOCAL_HOME}:/home/${USER_NAME}" \
-p 4000:4000 \
${IMAGE_NAME}-${USER_NAME} \
bash -c "${CMD}"
http://git-wip-us.apache.org/repos/asf/flink/blob/048c0a3e/docs/setup/docker.md
----------------------------------------------------------------------
diff --git a/docs/setup/docker.md b/docs/setup/docker.md
new file mode 100644
index 0000000..29e696f
--- /dev/null
+++ b/docs/setup/docker.md
@@ -0,0 +1,99 @@
+---
+title: "Docker Setup"
+nav-title: Docker
+nav-parent_id: deployment
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Docker](https://www.docker.com) is a popular container runtime. There are
+official Flink Docker images available on Docker Hub which can be used directly
+or extended to better integrate into a production environment.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Official Flink Docker Images
+
+The [official Flink Docker repository](https://hub.docker.com/_/flink/) is
+hosted on Docker Hub and serves images of Flink version 1.2.1 and later.
+
+Images for each supported combination of Hadoop and Scala are available, and
+tag aliases are provided for convenience.
+
+For example, the following aliases can be used: *(`1.2.y` indicates the latest
+release of Flink 1.2)*
+
+* `flink:latest` →
+`flink:<latest-flink>-hadoop<latest-hadoop>-scala_<latest-scala>`
+* `flink:1.2` → `flink:1.2.y-hadoop27-scala_2.11`
+* `flink:1.2.1-scala_2.10` → `flink:1.2.1-hadoop27-scala_2.10`
+* `flink:1.2-hadoop26` → `flink:1.2.y-hadoop26-scala_2.11`
+
+<!-- NOTE: uncomment when docker-flink/docker-flink/issues/14 is resolved. -->
+<!--
+Additionally, images based on Alpine Linux are available. Reference them by
+appending `-alpine` to the tag. For the Alpine version of `flink:latest`, use
+`flink:alpine`.
+
+For example:
+
+* `flink:alpine`
+* `flink:1.2.1-alpine`
+* `flink:1.2-scala_2.10-alpine`
+-->
+
+## Flink with Docker Compose
+
+[Docker Compose](https://docs.docker.com/compose/) is a convenient way to run a
+group of Docker containers locally.
+
+An [example config file](https://github.com/docker-flink/examples) is available
+on GitHub.
+
+### Usage
+
+* Launch a cluster in the foreground
+
+ docker-compose up
+
+* Launch a cluster in the background
+
+ docker-compose up -d
+
+* Scale the cluster up or down to *N* TaskManagers
+
+ docker-compose scale taskmanager=<N>
+
+When the cluster is running, you can visit the web UI at [http://localhost:8081
+](http://localhost:8081) and submit a job.
+
+To submit a job via the command line, you must copy the JAR to the Jobmanager
+container and submit the job from there.
+
+For example:
+
+{% raw %}
+ $ JOBMANAGER_CONTAINER=$(docker ps --filter name=jobmanager --format={{.ID}})
+ $ docker cp path/to/jar "$JOBMANAGER_CONTAINER":/job.jar
+ $ docker exec -t -i "$JOBMANAGER_CONTAINER" flink run /job.jar
+{% endraw %}
+
+{% top %}
http://git-wip-us.apache.org/repos/asf/flink/blob/048c0a3e/docs/setup/kubernetes.md
----------------------------------------------------------------------
diff --git a/docs/setup/kubernetes.md b/docs/setup/kubernetes.md
new file mode 100644
index 0000000..0790a05
--- /dev/null
+++ b/docs/setup/kubernetes.md
@@ -0,0 +1,157 @@
+---
+title: "Kubernetes Setup"
+nav-title: Kubernetes
+nav-parent_id: deployment
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Kubernetes](https://kubernetes.io) is a container orchestration system.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Simple Kubernetes Flink Cluster
+
+A basic Flink cluster deployment in Kubernetes has three components:
+
+* a Deployment for a single Jobmanager
+* a Deployment for a pool of Taskmanagers
+* a Service exposing the Jobmanager's RPC and UI ports
+
+### Launching the cluster
+
+Using the [resource definitions found below](#simple-kubernetes-flink-cluster-
+resources), launch the cluster with the `kubectl` command:
+
+ kubectl create -f jobmanager-deployment.yaml
+ kubectl create -f taskmanager-deployment.yaml
+ kubectl create -f jobmanager-service.yaml
+
+You can then access the Flink UI via `kubectl proxy`:
+
+1. Run `kubectl proxy` in a terminal
+2. Navigate to [http://localhost:8001/api/v1/proxy/namespaces/default/services/flink-jobmanager:8081
+](http://localhost:8001/api/v1/proxy/namespaces/default/services/flink-
+jobmanager:8081) in your browser
+
+### Deleting the cluster
+
+Again, use `kubectl` to delete the cluster:
+
+ kubectl delete -f jobmanager-deployment.yaml
+ kubectl delete -f taskmanager-deployment.yaml
+ kubectl delete -f jobmanager-service.yaml
+
+## Advanced Cluster Deployment
+
+An early version of a [Flink Helm chart](https://github.com/docker-flink/
+examples) is available on GitHub.
+
+## Appendix
+
+### Simple Kubernetes Flink cluster resources
+
+`jobmanager-deployment.yaml`
+{% highlight yaml %}
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+ name: flink-jobmanager
+spec:
+ replicas: 1
+ template:
+ metadata:
+ labels:
+ app: flink
+ component: jobmanager
+ spec:
+ containers:
+ - name: jobmanager
+ image: flink:latest
+ args:
+ - jobmanager
+ ports:
+ - containerPort: 6123
+ name: rpc
+ - containerPort: 6124
+ name: blob
+ - containerPort: 6125
+ name: query
+ - containerPort: 8081
+ name: ui
+ env:
+ - name: JOB_MANAGER_RPC_ADDRESS
+ value: flink-jobmanager
+{% endhighlight %}
+
+`taskmanager-deployment.yaml`
+{% highlight yaml %}
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+ name: flink-taskmanager
+spec:
+ replicas: 2
+ template:
+ metadata:
+ labels:
+ app: flink
+ component: taskmanager
+ spec:
+ containers:
+ - name: taskmanager
+ image: flink:latest
+ args:
+ - taskmanager
+ ports:
+ - containerPort: 6121
+ name: data
+ - containerPort: 6122
+ name: rpc
+ - containerPort: 6125
+ name: query
+ env:
+ - name: JOB_MANAGER_RPC_ADDRESS
+ value: flink-jobmanager
+{% endhighlight %}
+
+`jobmanager-service.yaml`
+{% highlight yaml %}
+apiVersion: v1
+kind: Service
+metadata:
+ name: flink-jobmanager
+spec:
+ ports:
+ - name: rpc
+ port: 6123
+ - name: blob
+ port: 6124
+ - name: query
+ port: 6125
+ - name: ui
+ port: 8081
+ selector:
+ app: flink
+ component: jobmanager
+{% endhighlight %}
+
+{% top %}
[2/2] flink git commit: [FLINK-6512] [docs] improved code formatting
in some examples
Posted by gr...@apache.org.
[FLINK-6512] [docs] improved code formatting in some examples
This closes #3857
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fbd08b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fbd08b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fbd08b5
Branch: refs/heads/release-1.2
Commit: 9fbd08b58f4ead2dddcc283f99385fa5be94eecf
Parents: 048c0a3
Author: David Anderson <da...@alpinegizmo.com>
Authored: Tue May 9 17:23:46 2017 +0200
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed May 10 14:38:48 2017 -0400
----------------------------------------------------------------------
docs/dev/migration.md | 300 +++++++++++++++++----------------
docs/monitoring/best_practices.md | 30 ++--
2 files changed, 171 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9fbd08b5/docs/dev/migration.md
----------------------------------------------------------------------
diff --git a/docs/dev/migration.md b/docs/dev/migration.md
index a5910a8..11eb42c 100644
--- a/docs/dev/migration.md
+++ b/docs/dev/migration.md
@@ -51,69 +51,70 @@ As running examples for the remainder of this document we will use the `CountMap
functions. The first is an example of a function with **keyed** state, while
the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below:
- public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+{% highlight java %}
+public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
- private transient ValueState<Integer> counter;
+ private transient ValueState<Integer> counter;
- private final int numberElements;
+ private final int numberElements;
- public CountMapper(int numberElements) {
- this.numberElements = numberElements;
- }
+ public CountMapper(int numberElements) {
+ this.numberElements = numberElements;
+ }
- @Override
- public void open(Configuration parameters) throws Exception {
- counter = getRuntimeContext().getState(
- new ValueStateDescriptor<>("counter", Integer.class, 0));
- }
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ counter = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("counter", Integer.class, 0));
+ }
- @Override
- public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
- int count = counter.value() + 1;
- counter.update(count);
+ @Override
+ public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
+ int count = counter.value() + 1;
+ counter.update(count);
- if (count % numberElements == 0) {
- out.collect(Tuple2.of(value.f0, count));
- counter.update(0); // reset to 0
- }
+ if (count % numberElements == 0) {
+ out.collect(Tuple2.of(value.f0, count));
+ counter.update(0); // reset to 0
}
}
+}
+public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
+ Checkpointed<ArrayList<Tuple2<String, Integer>>> {
- public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
- Checkpointed<ArrayList<Tuple2<String, Integer>>> {
-
- private final int threshold;
+ private final int threshold;
- private ArrayList<Tuple2<String, Integer>> bufferedElements;
+ private ArrayList<Tuple2<String, Integer>> bufferedElements;
- BufferingSink(int threshold) {
- this.threshold = threshold;
- this.bufferedElements = new ArrayList<>();
- }
+ BufferingSink(int threshold) {
+ this.threshold = threshold;
+ this.bufferedElements = new ArrayList<>();
+ }
- @Override
- public void invoke(Tuple2<String, Integer> value) throws Exception {
- bufferedElements.add(value);
- if (bufferedElements.size() == threshold) {
- for (Tuple2<String, Integer> element: bufferedElements) {
- // send it to the sink
- }
- bufferedElements.clear();
- }
+ @Override
+ public void invoke(Tuple2<String, Integer> value) throws Exception {
+ bufferedElements.add(value);
+ if (bufferedElements.size() == threshold) {
+ for (Tuple2<String, Integer> element: bufferedElements) {
+ // send it to the sink
}
+ bufferedElements.clear();
+ }
+ }
- @Override
- public ArrayList<Tuple2<String, Integer>> snapshotState(
- long checkpointId, long checkpointTimestamp) throws Exception {
- return bufferedElements;
- }
+ @Override
+ public ArrayList<Tuple2<String, Integer>> snapshotState(
+ long checkpointId, long checkpointTimestamp) throws Exception {
+ return bufferedElements;
+ }
- @Override
- public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
- bufferedElements.addAll(state);
- }
+ @Override
+ public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+ bufferedElements.addAll(state);
}
+}
+{% endhighlight %}
The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped-by-key input stream of the form
@@ -160,9 +161,11 @@ the [State documentation]({{ site.baseurl }}/dev/stream/state.html).
The `ListCheckpointed` interface requires the implementation of two methods:
- List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
+{% highlight java %}
+List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
- void restoreState(List<T> state) throws Exception;
+void restoreState(List<T> state) throws Exception;
+{% endhighlight %}
Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference
is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and
@@ -170,53 +173,55 @@ is that now `snapshotState()` should return a list of objects to checkpoint, as
return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink`
is included below:
- public class BufferingSinkListCheckpointed implements
- SinkFunction<Tuple2<String, Integer>>,
- ListCheckpointed<Tuple2<String, Integer>>,
- CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+{% highlight java %}
+public class BufferingSinkListCheckpointed implements
+ SinkFunction<Tuple2<String, Integer>>,
+ ListCheckpointed<Tuple2<String, Integer>>,
+ CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
- private final int threshold;
+ private final int threshold;
- private transient ListState<Tuple2<String, Integer>> checkpointedState;
+ private transient ListState<Tuple2<String, Integer>> checkpointedState;
- private List<Tuple2<String, Integer>> bufferedElements;
+ private List<Tuple2<String, Integer>> bufferedElements;
- public BufferingSinkListCheckpointed(int threshold) {
- this.threshold = threshold;
- this.bufferedElements = new ArrayList<>();
- }
+ public BufferingSinkListCheckpointed(int threshold) {
+ this.threshold = threshold;
+ this.bufferedElements = new ArrayList<>();
+ }
- @Override
- public void invoke(Tuple2<String, Integer> value) throws Exception {
- this.bufferedElements.add(value);
- if (bufferedElements.size() == threshold) {
- for (Tuple2<String, Integer> element: bufferedElements) {
- // send it to the sink
- }
- bufferedElements.clear();
+ @Override
+ public void invoke(Tuple2<String, Integer> value) throws Exception {
+ this.bufferedElements.add(value);
+ if (bufferedElements.size() == threshold) {
+ for (Tuple2<String, Integer> element: bufferedElements) {
+ // send it to the sink
}
+ bufferedElements.clear();
}
+ }
- @Override
- public List<Tuple2<String, Integer>> snapshotState(
- long checkpointId, long timestamp) throws Exception {
- return this.bufferedElements;
- }
-
- @Override
- public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
- if (!state.isEmpty()) {
- this.bufferedElements.addAll(state);
- }
- }
+ @Override
+ public List<Tuple2<String, Integer>> snapshotState(
+ long checkpointId, long timestamp) throws Exception {
+ return this.bufferedElements;
+ }
- @Override
- public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
- // this is from the CheckpointedRestoring interface.
+ @Override
+ public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
+ if (!state.isEmpty()) {
this.bufferedElements.addAll(state);
}
}
+ @Override
+ public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+ // this is from the CheckpointedRestoring interface.
+ this.bufferedElements.addAll(state);
+ }
+}
+{% endhighlight %}
+
As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards
compatibility reasons and more details will be explained at the end of this section.
@@ -224,9 +229,11 @@ compatibility reasons and more details will be explained at the end of this sect
The `CheckpointedFunction` interface requires again the implementation of two methods:
- void snapshotState(FunctionSnapshotContext context) throws Exception;
+{% highlight java %}
+void snapshotState(FunctionSnapshotContext context) throws Exception;
- void initializeState(FunctionInitializationContext context) throws Exception;
+void initializeState(FunctionInitializationContext context) throws Exception;
+{% endhighlight %}
As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is
the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only
@@ -234,57 +241,59 @@ in the case that we are recovering from a failure. Given this, `initializeState(
types of state are initialized, but also where state recovery logic is included. An implementation of the
`CheckpointedFunction` interface for `BufferingSink` is presented below.
- public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
- CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+{% highlight java %}
+public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
+ CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
- private final int threshold;
+ private final int threshold;
- private transient ListState<Tuple2<String, Integer>> checkpointedState;
+ private transient ListState<Tuple2<String, Integer>> checkpointedState;
- private List<Tuple2<String, Integer>> bufferedElements;
+ private List<Tuple2<String, Integer>> bufferedElements;
- public BufferingSink(int threshold) {
- this.threshold = threshold;
- this.bufferedElements = new ArrayList<>();
- }
+ public BufferingSink(int threshold) {
+ this.threshold = threshold;
+ this.bufferedElements = new ArrayList<>();
+ }
- @Override
- public void invoke(Tuple2<String, Integer> value) throws Exception {
- bufferedElements.add(value);
- if (bufferedElements.size() == threshold) {
- for (Tuple2<String, Integer> element: bufferedElements) {
- // send it to the sink
- }
- bufferedElements.clear();
+ @Override
+ public void invoke(Tuple2<String, Integer> value) throws Exception {
+ bufferedElements.add(value);
+ if (bufferedElements.size() == threshold) {
+ for (Tuple2<String, Integer> element: bufferedElements) {
+ // send it to the sink
}
+ bufferedElements.clear();
}
+ }
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- checkpointedState.clear();
- for (Tuple2<String, Integer> element : bufferedElements) {
- checkpointedState.add(element);
- }
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ checkpointedState.clear();
+ for (Tuple2<String, Integer> element : bufferedElements) {
+ checkpointedState.add(element);
}
+ }
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- checkpointedState = context.getOperatorStateStore().
- getSerializableListState("buffered-elements");
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ checkpointedState = context.getOperatorStateStore().
+ getSerializableListState("buffered-elements");
- if (context.isRestored()) {
- for (Tuple2<String, Integer> element : checkpointedState.get()) {
- bufferedElements.add(element);
- }
+ if (context.isRestored()) {
+ for (Tuple2<String, Integer> element : checkpointedState.get()) {
+ bufferedElements.add(element);
}
}
+ }
- @Override
- public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
- // this is from the CheckpointedRestoring interface.
- this.bufferedElements.addAll(state);
- }
+ @Override
+ public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+ // this is from the CheckpointedRestoring interface.
+ this.bufferedElements.addAll(state);
}
+}
+{% endhighlight %}
The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize
the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects
@@ -305,40 +314,41 @@ for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the `Co
the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods
would look like this:
- public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
- implements CheckpointedFunction {
+{% highlight java %}
+public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
+ implements CheckpointedFunction {
- private transient ValueState<Integer> counter;
+ private transient ValueState<Integer> counter;
- private final int numberElements;
+ private final int numberElements;
- public CountMapper(int numberElements) {
- this.numberElements = numberElements;
- }
+ public CountMapper(int numberElements) {
+ this.numberElements = numberElements;
+ }
- @Override
- public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
- int count = counter.value() + 1;
- counter.update(count);
+ @Override
+ public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
+ int count = counter.value() + 1;
+ counter.update(count);
- if (count % numberElements == 0) {
- out.collect(Tuple2.of(value.f0, count));
- counter.update(0); // reset to 0
- }
- }
+ if (count % numberElements == 0) {
+ out.collect(Tuple2.of(value.f0, count));
+ counter.update(0); // reset to 0
}
+ }
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- //all managed, nothing to do.
- }
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ // all managed, nothing to do.
+ }
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- counter = context.getKeyedStateStore().getState(
- new ValueStateDescriptor<>("counter", Integer.class, 0));
- }
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ counter = context.getKeyedStateStore().getState(
+ new ValueStateDescriptor<>("counter", Integer.class, 0));
}
+}
+{% endhighlight %}
Notice that the `snapshotState()` method is empty as Flink itself takes care of snapshotting managed keyed state
upon checkpointing.
http://git-wip-us.apache.org/repos/asf/flink/blob/9fbd08b5/docs/monitoring/best_practices.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/best_practices.md b/docs/monitoring/best_practices.md
index 0bd362e..5a8bcbc 100644
--- a/docs/monitoring/best_practices.md
+++ b/docs/monitoring/best_practices.md
@@ -59,8 +59,8 @@ ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
This allows getting arguments like `--input hdfs:///mydata --elements 42` from the command line.
{% highlight java %}
public static void main(String[] args) {
- ParameterTool parameter = ParameterTool.fromArgs(args);
- // .. regular code ..
+ ParameterTool parameter = ParameterTool.fromArgs(args);
+ // .. regular code ..
{% endhighlight %}
@@ -114,17 +114,18 @@ The example below shows how to pass the parameters as a `Configuration` object t
{% highlight java %}
ParameterTool parameters = ParameterTool.fromArgs(args);
-DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
+DataSet<Tuple2<String, Integer>> counts = text
+ .flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
{% endhighlight %}
In the `Tokenizer`, the object is now accessible in the `open(Configuration conf)` method:
{% highlight java %}
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
- @Override
- public void open(Configuration parameters) throws Exception {
- parameters.getInteger("myInt", -1);
- // .. do
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ parameters.getInteger("myInt", -1);
+ // .. do
{% endhighlight %}
@@ -147,11 +148,12 @@ Access them in any rich user function:
{% highlight java %}
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
- parameters.getRequired("input");
- // .. do more ..
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+ ParameterTool parameters = (ParameterTool)
+ getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+ parameters.getRequired("input");
+ // .. do more ..
{% endhighlight %}
@@ -198,8 +200,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyClass implements MapFunction {
- private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
- // ...
+ private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
+ // ...
{% endhighlight %}