You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/05/10 18:59:22 UTC

[1/2] flink git commit: [FLINK-6330] [docs] Add basic Docker, K8s docs

Repository: flink
Updated Branches:
  refs/heads/release-1.2 a820f662d -> 9fbd08b58


[FLINK-6330] [docs] Add basic Docker, K8s docs

This closes #3751


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/048c0a3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/048c0a3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/048c0a3e

Branch: refs/heads/release-1.2
Commit: 048c0a3e7390e9a8c991b32a8bdaf2648a4b564d
Parents: a820f66
Author: Patrick Lucas <me...@patricklucas.com>
Authored: Fri Apr 21 15:00:53 2017 +0200
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed May 10 14:38:47 2017 -0400

----------------------------------------------------------------------
 docs/docker/run.sh       |   4 +-
 docs/setup/docker.md     |  99 ++++++++++++++++++++++++++
 docs/setup/kubernetes.md | 157 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 259 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/048c0a3e/docs/docker/run.sh
----------------------------------------------------------------------
diff --git a/docs/docker/run.sh b/docs/docker/run.sh
index 3c8878a..5598c0a 100755
--- a/docs/docker/run.sh
+++ b/docs/docker/run.sh
@@ -31,10 +31,12 @@ if [ "$(uname -s)" == "Linux" ]; then
   USER_NAME=${SUDO_USER:=$USER}
   USER_ID=$(id -u "${USER_NAME}")
   GROUP_ID=$(id -g "${USER_NAME}")
+  LOCAL_HOME="/home/${USER_NAME}"
 else # boot2docker uid and gid
   USER_NAME=$USER
   USER_ID=1000
   GROUP_ID=50
+  LOCAL_HOME="/Users/${USER_NAME}"
 fi
 
 docker build -t "${IMAGE_NAME}-${USER_NAME}" - <<UserSpecificDocker
@@ -65,7 +67,7 @@ docker run -i -t \
   -w ${FLINK_DOC_ROOT} \
   -u "${USER}" \
   -v "${FLINK_DOC_ROOT}:${FLINK_DOC_ROOT}" \
-  -v "/home/${USER_NAME}:/home/${USER_NAME}" \
+  -v "${LOCAL_HOME}:/home/${USER_NAME}" \
   -p 4000:4000 \
   ${IMAGE_NAME}-${USER_NAME} \
   bash -c "${CMD}"

http://git-wip-us.apache.org/repos/asf/flink/blob/048c0a3e/docs/setup/docker.md
----------------------------------------------------------------------
diff --git a/docs/setup/docker.md b/docs/setup/docker.md
new file mode 100644
index 0000000..29e696f
--- /dev/null
+++ b/docs/setup/docker.md
@@ -0,0 +1,99 @@
+---
+title:  "Docker Setup"
+nav-title: Docker
+nav-parent_id: deployment
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Docker](https://www.docker.com) is a popular container runtime. There are
+official Flink Docker images available on Docker Hub which can be used directly
+or extended to better integrate into a production environment.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Official Flink Docker Images
+
+The [official Flink Docker repository](https://hub.docker.com/_/flink/) is
+hosted on Docker Hub and serves images of Flink version 1.2.1 and later.
+
+Images for each supported combination of Hadoop and Scala are available, and
+tag aliases are provided for convenience.
+
+For example, the following aliases can be used: *(`1.2.y` indicates the latest
+release of Flink 1.2)*
+
+* `flink:latest` →
+`flink:<latest-flink>-hadoop<latest-hadoop>-scala_<latest-scala>`
+* `flink:1.2` → `flink:1.2.y-hadoop27-scala_2.11`
+* `flink:1.2.1-scala_2.10` → `flink:1.2.1-hadoop27-scala_2.10`
+* `flink:1.2-hadoop26` → `flink:1.2.y-hadoop26-scala_2.11`
+
+<!-- NOTE: uncomment when docker-flink/docker-flink/issues/14 is resolved. -->
+<!--
+Additionally, images based on Alpine Linux are available. Reference them by
+appending `-alpine` to the tag. For the Alpine version of `flink:latest`, use
+`flink:alpine`.
+
+For example:
+
+* `flink:alpine`
+* `flink:1.2.1-alpine`
+* `flink:1.2-scala_2.10-alpine`
+-->
+
+## Flink with Docker Compose
+
+[Docker Compose](https://docs.docker.com/compose/) is a convenient way to run a
+group of Docker containers locally.
+
+An [example config file](https://github.com/docker-flink/examples) is available
+on GitHub.
+
+### Usage
+
+* Launch a cluster in the foreground
+
+        docker-compose up
+
+* Launch a cluster in the background
+
+        docker-compose up -d
+
+* Scale the cluster up or down to *N* TaskManagers
+
+        docker-compose scale taskmanager=<N>
+
+When the cluster is running, you can visit the web UI at [http://localhost:8081
+](http://localhost:8081) and submit a job.
+
+To submit a job via the command line, you must copy the JAR to the Jobmanager
+container and submit the job from there.
+
+For example:
+
+{% raw %}
+    $ JOBMANAGER_CONTAINER=$(docker ps --filter name=jobmanager --format={{.ID}})
+    $ docker cp path/to/jar "$JOBMANAGER_CONTAINER":/job.jar
+    $ docker exec -t -i "$JOBMANAGER_CONTAINER" flink run /job.jar
+{% endraw %}
+
+{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/048c0a3e/docs/setup/kubernetes.md
----------------------------------------------------------------------
diff --git a/docs/setup/kubernetes.md b/docs/setup/kubernetes.md
new file mode 100644
index 0000000..0790a05
--- /dev/null
+++ b/docs/setup/kubernetes.md
@@ -0,0 +1,157 @@
+---
+title:  "Kubernetes Setup"
+nav-title: Kubernetes
+nav-parent_id: deployment
+nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Kubernetes](https://kubernetes.io) is a container orchestration system.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Simple Kubernetes Flink Cluster
+
+A basic Flink cluster deployment in Kubernetes has three components:
+
+* a Deployment for a single Jobmanager
+* a Deployment for a pool of Taskmanagers
+* a Service exposing the Jobmanager's RPC and UI ports
+
+### Launching the cluster
+
+Using the [resource definitions found below](#simple-kubernetes-flink-cluster-
+resources), launch the cluster with the `kubectl` command:
+
+    kubectl create -f jobmanager-deployment.yaml
+    kubectl create -f taskmanager-deployment.yaml
+    kubectl create -f jobmanager-service.yaml
+
+You can then access the Flink UI via `kubectl proxy`:
+
+1. Run `kubectl proxy` in a terminal
+2. Navigate to [http://localhost:8001/api/v1/proxy/namespaces/default/services/flink-jobmanager:8081
+](http://localhost:8001/api/v1/proxy/namespaces/default/services/flink-
+jobmanager:8081) in your browser
+
+### Deleting the cluster
+
+Again, use `kubectl` to delete the cluster:
+
+    kubectl delete -f jobmanager-deployment.yaml
+    kubectl delete -f taskmanager-deployment.yaml
+    kubectl delete -f jobmanager-service.yaml
+
+## Advanced Cluster Deployment
+
+An early version of a [Flink Helm chart](https://github.com/docker-flink/
+examples) is available on GitHub.
+
+## Appendix
+
+### Simple Kubernetes Flink cluster resources
+
+`jobmanager-deployment.yaml`
+{% highlight yaml %}
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+  name: flink-jobmanager
+spec:
+  replicas: 1
+  template:
+    metadata:
+      labels:
+        app: flink
+        component: jobmanager
+    spec:
+      containers:
+      - name: jobmanager
+        image: flink:latest
+        args:
+        - jobmanager
+        ports:
+        - containerPort: 6123
+          name: rpc
+        - containerPort: 6124
+          name: blob
+        - containerPort: 6125
+          name: query
+        - containerPort: 8081
+          name: ui
+        env:
+        - name: JOB_MANAGER_RPC_ADDRESS
+          value: flink-jobmanager
+{% endhighlight %}
+
+`taskmanager-deployment.yaml`
+{% highlight yaml %}
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+  name: flink-taskmanager
+spec:
+  replicas: 2
+  template:
+    metadata:
+      labels:
+        app: flink
+        component: taskmanager
+    spec:
+      containers:
+      - name: taskmanager
+        image: flink:latest
+        args:
+        - taskmanager
+        ports:
+        - containerPort: 6121
+          name: data
+        - containerPort: 6122
+          name: rpc
+        - containerPort: 6125
+          name: query
+        env:
+        - name: JOB_MANAGER_RPC_ADDRESS
+          value: flink-jobmanager
+{% endhighlight %}
+
+`jobmanager-service.yaml`
+{% highlight yaml %}
+apiVersion: v1
+kind: Service
+metadata:
+  name: flink-jobmanager
+spec:
+  ports:
+  - name: rpc
+    port: 6123
+  - name: blob
+    port: 6124
+  - name: query
+    port: 6125
+  - name: ui
+    port: 8081
+  selector:
+    app: flink
+    component: jobmanager
+{% endhighlight %}
+
+{% top %}


[2/2] flink git commit: [FLINK-6512] [docs] improved code formatting in some examples

Posted by gr...@apache.org.
[FLINK-6512] [docs] improved code formatting in some examples

This closes #3857


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fbd08b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fbd08b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fbd08b5

Branch: refs/heads/release-1.2
Commit: 9fbd08b58f4ead2dddcc283f99385fa5be94eecf
Parents: 048c0a3
Author: David Anderson <da...@alpinegizmo.com>
Authored: Tue May 9 17:23:46 2017 +0200
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed May 10 14:38:48 2017 -0400

----------------------------------------------------------------------
 docs/dev/migration.md             | 300 +++++++++++++++++----------------
 docs/monitoring/best_practices.md |  30 ++--
 2 files changed, 171 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9fbd08b5/docs/dev/migration.md
----------------------------------------------------------------------
diff --git a/docs/dev/migration.md b/docs/dev/migration.md
index a5910a8..11eb42c 100644
--- a/docs/dev/migration.md
+++ b/docs/dev/migration.md
@@ -51,69 +51,70 @@ As running examples for the remainder of this document we will use the `CountMap
 functions. The first is an example of a function with **keyed** state, while
 the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below:
 
-    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+{% highlight java %}
+public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
 
-        private transient ValueState<Integer> counter;
+    private transient ValueState<Integer> counter;
 
-        private final int numberElements;
+    private final int numberElements;
 
-        public CountMapper(int numberElements) {
-            this.numberElements = numberElements;
-        }
+    public CountMapper(int numberElements) {
+        this.numberElements = numberElements;
+    }
 
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            counter = getRuntimeContext().getState(
-      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
-        }
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        counter = getRuntimeContext().getState(
+            new ValueStateDescriptor<>("counter", Integer.class, 0));
+    }
 
-        @Override
-        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
-            int count = counter.value() + 1;
-      	    counter.update(count);
+    @Override
+    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
+        int count = counter.value() + 1;
+        counter.update(count);
 
-      	    if (count % numberElements == 0) {
-     		    out.collect(Tuple2.of(value.f0, count));
-     		    counter.update(0); // reset to 0
-     	    }
+        if (count % numberElements == 0) {
+            out.collect(Tuple2.of(value.f0, count));
+            counter.update(0); // reset to 0
         }
     }
+}
 
+public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
+    Checkpointed<ArrayList<Tuple2<String, Integer>>> {
 
-    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
-
-	    private final int threshold;
+    private final int threshold;
 
-	    private ArrayList<Tuple2<String, Integer>> bufferedElements;
+    private ArrayList<Tuple2<String, Integer>> bufferedElements;
 
-	    BufferingSink(int threshold) {
-		    this.threshold = threshold;
-		    this.bufferedElements = new ArrayList<>();
-	    }
+    BufferingSink(int threshold) {
+        this.threshold = threshold;
+        this.bufferedElements = new ArrayList<>();
+    }
 
-    	@Override
-	    public void invoke(Tuple2<String, Integer> value) throws Exception {
-		    bufferedElements.add(value);
-		    if (bufferedElements.size() == threshold) {
-			    for (Tuple2<String, Integer> element: bufferedElements) {
-				    // send it to the sink
-			    }
-			    bufferedElements.clear();
-		    }
+    @Override
+    public void invoke(Tuple2<String, Integer> value) throws Exception {
+        bufferedElements.add(value);
+        if (bufferedElements.size() == threshold) {
+            for (Tuple2<String, Integer> element: bufferedElements) {
+	        // send it to the sink
 	    }
+	    bufferedElements.clear();
+	}
+    }
 
-	    @Override
-	    public ArrayList<Tuple2<String, Integer>> snapshotState(
-	            long checkpointId, long checkpointTimestamp) throws Exception {
-		    return bufferedElements;
-	    }
+    @Override
+    public ArrayList<Tuple2<String, Integer>> snapshotState(
+        long checkpointId, long checkpointTimestamp) throws Exception {
+	    return bufferedElements;
+    }
 
-	    @Override
-	    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-	    	bufferedElements.addAll(state);
-        }
+    @Override
+    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+        bufferedElements.addAll(state);
     }
+}
+{% endhighlight %}
 
 
 The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped-by-key input stream of the form
@@ -160,9 +161,11 @@ the [State documentation]({{ site.baseurl }}/dev/stream/state.html).
 
 The `ListCheckpointed` interface requires the implementation of two methods:
 
-    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
+{% highlight java %}
+List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
 
-    void restoreState(List<T> state) throws Exception;
+void restoreState(List<T> state) throws Exception;
+{% endhighlight %}
 
 Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference
 is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and
@@ -170,53 +173,55 @@ is that now `snapshotState()` should return a list of objects to checkpoint, as
 return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink`
 is included below:
 
-    public class BufferingSinkListCheckpointed implements
-            SinkFunction<Tuple2<String, Integer>>,
-            ListCheckpointed<Tuple2<String, Integer>>,
-            CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+{% highlight java %}
+public class BufferingSinkListCheckpointed implements
+        SinkFunction<Tuple2<String, Integer>>,
+        ListCheckpointed<Tuple2<String, Integer>>,
+        CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
 
-        private final int threshold;
+    private final int threshold;
 
-        private transient ListState<Tuple2<String, Integer>> checkpointedState;
+    private transient ListState<Tuple2<String, Integer>> checkpointedState;
 
-        private List<Tuple2<String, Integer>> bufferedElements;
+    private List<Tuple2<String, Integer>> bufferedElements;
 
-        public BufferingSinkListCheckpointed(int threshold) {
-            this.threshold = threshold;
-            this.bufferedElements = new ArrayList<>();
-        }
+    public BufferingSinkListCheckpointed(int threshold) {
+        this.threshold = threshold;
+        this.bufferedElements = new ArrayList<>();
+    }
 
-        @Override
-        public void invoke(Tuple2<String, Integer> value) throws Exception {
-            this.bufferedElements.add(value);
-            if (bufferedElements.size() == threshold) {
-                for (Tuple2<String, Integer> element: bufferedElements) {
-                    // send it to the sink
-                }
-                bufferedElements.clear();
+    @Override
+    public void invoke(Tuple2<String, Integer> value) throws Exception {
+        this.bufferedElements.add(value);
+        if (bufferedElements.size() == threshold) {
+            for (Tuple2<String, Integer> element: bufferedElements) {
+                // send it to the sink
             }
+            bufferedElements.clear();
         }
+    }
 
-        @Override
-        public List<Tuple2<String, Integer>> snapshotState(
-                long checkpointId, long timestamp) throws Exception {
-            return this.bufferedElements;
-        }
-
-        @Override
-        public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
-            if (!state.isEmpty()) {
-                this.bufferedElements.addAll(state);
-            }
-        }
+    @Override
+    public List<Tuple2<String, Integer>> snapshotState(
+            long checkpointId, long timestamp) throws Exception {
+        return this.bufferedElements;
+    }
 
-        @Override
-        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-            // this is from the CheckpointedRestoring interface.
+    @Override
+    public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
+        if (!state.isEmpty()) {
             this.bufferedElements.addAll(state);
         }
     }
 
+    @Override
+    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+        // this is from the CheckpointedRestoring interface.
+        this.bufferedElements.addAll(state);
+    }
+}
+{% endhighlight %}
+
 As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards
 compatibility reasons and more details will be explained at the end of this section.
 
@@ -224,9 +229,11 @@ compatibility reasons and more details will be explained at the end of this sect
 
 The `CheckpointedFunction` interface requires again the implementation of two methods:
 
-    void snapshotState(FunctionSnapshotContext context) throws Exception;
+{% highlight java %}
+void snapshotState(FunctionSnapshotContext context) throws Exception;
 
-    void initializeState(FunctionInitializationContext context) throws Exception;
+void initializeState(FunctionInitializationContext context) throws Exception;
+{% endhighlight %}
 
 As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is
 the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only
@@ -234,57 +241,59 @@ in the case that we are recovering from a failure. Given this, `initializeState(
 types of state are initialized, but also where state recovery logic is included. An implementation of the
 `CheckpointedFunction` interface for `BufferingSink` is presented below.
 
-    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-            CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+{% highlight java %}
+public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
+        CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
 
-        private final int threshold;
+    private final int threshold;
 
-        private transient ListState<Tuple2<String, Integer>> checkpointedState;
+    private transient ListState<Tuple2<String, Integer>> checkpointedState;
 
-        private List<Tuple2<String, Integer>> bufferedElements;
+    private List<Tuple2<String, Integer>> bufferedElements;
 
-        public BufferingSink(int threshold) {
-            this.threshold = threshold;
-            this.bufferedElements = new ArrayList<>();
-        }
+    public BufferingSink(int threshold) {
+        this.threshold = threshold;
+        this.bufferedElements = new ArrayList<>();
+    }
 
-        @Override
-        public void invoke(Tuple2<String, Integer> value) throws Exception {
-            bufferedElements.add(value);
-            if (bufferedElements.size() == threshold) {
-                for (Tuple2<String, Integer> element: bufferedElements) {
-                    // send it to the sink
-                }
-                bufferedElements.clear();
+    @Override
+    public void invoke(Tuple2<String, Integer> value) throws Exception {
+        bufferedElements.add(value);
+        if (bufferedElements.size() == threshold) {
+            for (Tuple2<String, Integer> element: bufferedElements) {
+                // send it to the sink
             }
+            bufferedElements.clear();
         }
+    }
 
-        @Override
-        public void snapshotState(FunctionSnapshotContext context) throws Exception {
-            checkpointedState.clear();
-            for (Tuple2<String, Integer> element : bufferedElements) {
-                checkpointedState.add(element);
-            }
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        checkpointedState.clear();
+        for (Tuple2<String, Integer> element : bufferedElements) {
+            checkpointedState.add(element);
         }
+    }
 
-        @Override
-        public void initializeState(FunctionInitializationContext context) throws Exception {
-            checkpointedState = context.getOperatorStateStore().
-                getSerializableListState("buffered-elements");
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        checkpointedState = context.getOperatorStateStore().
+            getSerializableListState("buffered-elements");
 
-            if (context.isRestored()) {
-                for (Tuple2<String, Integer> element : checkpointedState.get()) {
-                    bufferedElements.add(element);
-                }
+        if (context.isRestored()) {
+            for (Tuple2<String, Integer> element : checkpointedState.get()) {
+                bufferedElements.add(element);
             }
         }
+    }
 
-        @Override
-        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-            // this is from the CheckpointedRestoring interface.
-            this.bufferedElements.addAll(state);
-        }
+    @Override
+    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+        // this is from the CheckpointedRestoring interface.
+        this.bufferedElements.addAll(state);
     }
+}
+{% endhighlight %}
 
 The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize
 the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects
@@ -305,40 +314,41 @@ for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the `Co
 the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods
 would look like this:
 
-    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
-            implements CheckpointedFunction {
+{% highlight java %}
+public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
+        implements CheckpointedFunction {
 
-        private transient ValueState<Integer> counter;
+    private transient ValueState<Integer> counter;
 
-        private final int numberElements;
+    private final int numberElements;
 
-        public CountMapper(int numberElements) {
-            this.numberElements = numberElements;
-        }
+    public CountMapper(int numberElements) {
+        this.numberElements = numberElements;
+    }
 
-        @Override
-        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
-            int count = counter.value() + 1;
-            counter.update(count);
+    @Override
+    public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
+        int count = counter.value() + 1;
+        counter.update(count);
 
-            if (count % numberElements == 0) {
-                out.collect(Tuple2.of(value.f0, count));
-             	counter.update(0); // reset to 0
-             	}
-            }
+        if (count % numberElements == 0) {
+            out.collect(Tuple2.of(value.f0, count));
+            counter.update(0); // reset to 0
         }
+    }
 
-        @Override
-        public void snapshotState(FunctionSnapshotContext context) throws Exception {
-            //all managed, nothing to do.
-        }
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        // all managed, nothing to do.
+    }
 
-        @Override
-        public void initializeState(FunctionInitializationContext context) throws Exception {
-            counter = context.getKeyedStateStore().getState(
-                new ValueStateDescriptor<>("counter", Integer.class, 0));
-        }
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        counter = context.getKeyedStateStore().getState(
+            new ValueStateDescriptor<>("counter", Integer.class, 0));
     }
+}
+{% endhighlight %}
 
 Notice that the `snapshotState()` method is empty as Flink itself takes care of snapshotting managed keyed state
 upon checkpointing.

http://git-wip-us.apache.org/repos/asf/flink/blob/9fbd08b5/docs/monitoring/best_practices.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/best_practices.md b/docs/monitoring/best_practices.md
index 0bd362e..5a8bcbc 100644
--- a/docs/monitoring/best_practices.md
+++ b/docs/monitoring/best_practices.md
@@ -59,8 +59,8 @@ ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
 This allows getting arguments like `--input hdfs:///mydata --elements 42` from the command line.
 {% highlight java %}
 public static void main(String[] args) {
-	ParameterTool parameter = ParameterTool.fromArgs(args);
-	// .. regular code ..
+    ParameterTool parameter = ParameterTool.fromArgs(args);
+    // .. regular code ..
 {% endhighlight %}
 
 
@@ -114,17 +114,18 @@ The example below shows how to pass the parameters as a `Configuration` object t
 
 {% highlight java %}
 ParameterTool parameters = ParameterTool.fromArgs(args);
-DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
+DataSet<Tuple2<String, Integer>> counts = text
+        .flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
 {% endhighlight %}
 
 In the `Tokenizer`, the object is now accessible in the `open(Configuration conf)` method:
 
 {% highlight java %}
 public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		parameters.getInteger("myInt", -1);
-		// .. do
+    @Override
+    public void open(Configuration parameters) throws Exception {
+	parameters.getInteger("myInt", -1);
+	// .. do
 {% endhighlight %}
 
 
@@ -147,11 +148,12 @@ Access them in any rich user function:
 {% highlight java %}
 public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
 
-	@Override
-	public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-		ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
-		parameters.getRequired("input");
-		// .. do more ..
+    @Override
+    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+	ParameterTool parameters = (ParameterTool)
+	    getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+	parameters.getRequired("input");
+	// .. do more ..
 {% endhighlight %}
 
 
@@ -198,8 +200,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MyClass implements MapFunction {
-	private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
-	// ...
+    private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
+    // ...
 {% endhighlight %}