You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/05/03 21:05:22 UTC

[1/2] flink git commit: [FLINK-8726][docs] Fix and normalize code-highlighting

Repository: flink
Updated Branches:
  refs/heads/master 545d53074 -> b13c70b60


http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/migration.md
----------------------------------------------------------------------
diff --git a/docs/dev/migration.md b/docs/dev/migration.md
index b9ed8fb..f91a3d8 100644
--- a/docs/dev/migration.md
+++ b/docs/dev/migration.md
@@ -59,7 +59,7 @@ now clean of specific logger dependencies.
 Example and quickstart archetypes already have loggers specified and should not be affected.
 For other custom projects, make sure to add logger dependencies. For example, in Maven's `pom.xml`, you can add:
 
-~~~xml
+{% highlight xml %}
 <dependency>
     <groupId>org.slf4j</groupId>
     <artifactId>slf4j-log4j12</artifactId>
@@ -71,7 +71,7 @@ For other custom projects, make sure to add logger dependencies. For example, in
     <artifactId>log4j</artifactId>
     <version>1.2.17</version>
 </dependency>
-~~~
+{% endhighlight %}
 
 ## Migrating from Flink 1.1 to Flink 1.2
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/restart_strategies.md
----------------------------------------------------------------------
diff --git a/docs/dev/restart_strategies.md b/docs/dev/restart_strategies.md
index a4901ab..49419e1 100644
--- a/docs/dev/restart_strategies.md
+++ b/docs/dev/restart_strategies.md
@@ -107,9 +107,9 @@ In-between two consecutive restart attempts, the restart strategy waits a fixed
 
 This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.
 
-~~~
+{% highlight yaml %}
 restart-strategy: fixed-delay
-~~~
+{% endhighlight %}
 
 <table class="table table-bordered">
   <thead>
@@ -135,10 +135,10 @@ restart-strategy: fixed-delay
 
 For example:
 
-~~~
+{% highlight yaml %}
 restart-strategy.fixed-delay.attempts: 3
 restart-strategy.fixed-delay.delay: 10 s
-~~~
+{% endhighlight %}
 
 The fixed delay restart strategy can also be set programmatically:
 
@@ -172,9 +172,9 @@ In-between two consecutive restart attempts, the restart strategy waits a fixed
 
 This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.
 
-~~~
+{% highlight yaml %}
 restart-strategy: failure-rate
-~~~
+{% endhighlight %}
 
 <table class="table table-bordered">
   <thead>
@@ -203,11 +203,11 @@ restart-strategy: failure-rate
   </tbody>
 </table>
 
-~~~
+{% highlight yaml %}
 restart-strategy.failure-rate.max-failures-per-interval: 3
 restart-strategy.failure-rate.failure-rate-interval: 5 min
 restart-strategy.failure-rate.delay: 10 s
-~~~
+{% endhighlight %}
 
 The failure rate restart strategy can also be set programmatically:
 
@@ -240,9 +240,9 @@ env.setRestartStrategy(RestartStrategies.failureRateRestart(
 
 The job fails directly and no restart is attempted.
 
-~~~
+{% highlight yaml %}
 restart-strategy: none
-~~~
+{% endhighlight %}
 
 The no restart strategy can also be set programmatically:
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/scala_shell.md
----------------------------------------------------------------------
diff --git a/docs/dev/scala_shell.md b/docs/dev/scala_shell.md
index b12060b..b8d2b2c 100644
--- a/docs/dev/scala_shell.md
+++ b/docs/dev/scala_shell.md
@@ -27,9 +27,9 @@ It can be used in a local setup as well as in a cluster setup.
 
 To use the shell with an integrated Flink cluster just execute:
 
-~~~bash
+{% highlight bash %}
 bin/start-scala-shell.sh local
-~~~
+{% endhighlight %}
 
 in the root directory of your binary Flink directory. To run the Shell on a
 cluster, please see the Setup section below.
@@ -44,7 +44,7 @@ Use "benv" and "senv" to access the Batch and Streaming environment respectively
 
 The following example will execute the wordcount program in the Scala shell:
 
-~~~scala
+{% highlight scala %}
 Scala-Flink> val text = benv.fromElements(
   "To be, or not to be,--that is the question:--",
   "Whether 'tis nobler in the mind to suffer",
@@ -54,21 +54,21 @@ Scala-Flink> val counts = text
     .flatMap { _.toLowerCase.split("\\W+") }
     .map { (_, 1) }.groupBy(0).sum(1)
 Scala-Flink> counts.print()
-~~~
+{% endhighlight %}
 
 The print() command will automatically send the specified tasks to the JobManager for execution and will show the result of the computation in the terminal.
 
 It is possible to write results to a file. However, in this case you need to call `execute`, to run your program:
 
-~~~scala
+{% highlight scala %}
 Scala-Flink> benv.execute("MyProgram")
-~~~
+{% endhighlight %}
 
 ### DataStream API
 
 Similar to the batch program above, we can execute a streaming program through the DataStream API:
 
-~~~scala
+{% highlight scala %}
 Scala-Flink> val textStreaming = senv.fromElements(
   "To be, or not to be,--that is the question:--",
   "Whether 'tis nobler in the mind to suffer",
@@ -79,7 +79,7 @@ Scala-Flink> val countsStreaming = textStreaming
     .map { (_, 1) }.keyBy(0).sum(1)
 Scala-Flink> countsStreaming.print()
 Scala-Flink> senv.execute("Streaming Wordcount")
-~~~
+{% endhighlight %}
 
 Note, that in the Streaming case, the print operation does not trigger execution directly.
 
@@ -92,26 +92,26 @@ It is possible to add external classpaths to the Scala-shell. These will be sent
 
 Use the parameter `-a <path/to/jar.jar>` or `--addclasspath <path/to/jar.jar>` to load additional classes.
 
-~~~bash
+{% highlight bash %}
 bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>
-~~~
+{% endhighlight %}
 
 
 ## Setup
 
 To get an overview of what options the Scala Shell provides, please use
 
-~~~bash
+{% highlight bash %}
 bin/start-scala-shell.sh --help
-~~~
+{% endhighlight %}
 
 ### Local
 
 To use the shell with an integrated Flink cluster just execute:
 
-~~~bash
+{% highlight bash %}
 bin/start-scala-shell.sh local
-~~~
+{% endhighlight %}
 
 
 ### Remote
@@ -119,9 +119,9 @@ bin/start-scala-shell.sh local
 To use it with a running cluster start the scala shell with the keyword `remote`
 and supply the host and port of the JobManager with:
 
-~~~bash
+{% highlight bash %}
 bin/start-scala-shell.sh remote <hostname> <portnumber>
-~~~
+{% endhighlight %}
 
 ### Yarn Scala Shell cluster
 
@@ -134,9 +134,9 @@ JobManager, name of YARN application, etc.
 For example, to start a Yarn cluster for the Scala Shell with two TaskManagers
 use the following:
 
-~~~bash
+{% highlight bash %}
  bin/start-scala-shell.sh yarn -n 2
-~~~
+{% endhighlight %}
 
 For all other options, see the full reference at the bottom.
 
@@ -146,14 +146,14 @@ For all other options, see the full reference at the bottom.
 If you have previously deployed a Flink cluster using the Flink Yarn Session,
 the Scala shell can connect with it using the following command:
 
-~~~bash
+{% highlight bash %}
  bin/start-scala-shell.sh yarn
-~~~
+{% endhighlight %}
 
 
 ## Full Reference
 
-~~~bash
+{% highlight bash %}
 Flink Scala Shell
 Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...
 
@@ -190,6 +190,6 @@ Starts Flink scala shell connecting to a yarn cluster
         The configuration directory.
   -h | --help
         Prints this usage text
-~~~
+{% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 27d79af..ce0e1d6 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -113,7 +113,7 @@ Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/referenc
 
 The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. The [Operations](#operations) section shows examples for the supported features and indicates which features are only supported for batch or streaming queries.
 
-```
+{% highlight sql %}
 
 query:
   values
@@ -180,7 +180,7 @@ insert:
   INSERT INTO tableReference
   query
 
-```
+{% endhighlight %}
 
 Flink SQL uses a lexical policy for identifier (table, attribute, function names) similar to Java:
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/table/streaming.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/streaming.md b/docs/dev/table/streaming.md
index dc0fdf8..f8f19c0 100644
--- a/docs/dev/table/streaming.md
+++ b/docs/dev/table/streaming.md
@@ -84,13 +84,13 @@ The following figure visualizes the relationship of streams, dynamic tables, and
 
 In the following, we will explain the concepts of dynamic tables and continuous queries with a stream of click events that have the following schema:
 
-```
+{% highlight plain %}
 [ 
   user:  VARCHAR,   // the name of the user
   cTime: TIMESTAMP, // the time when the URL was accessed
   url:   VARCHAR    // the URL that was accessed by the user
 ]
-```
+{% endhighlight %}
 
 ### Defining a Table on a Stream
 
@@ -557,9 +557,9 @@ Many queries aggregate or join records on one or more key attributes. When such
 
 For example the following query computes the number of clicks per session.
 
-```
+{% highlight sql %}
 SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
-```
+{% endhighlight %}
 
 The `sessionId` attribute is used as a grouping key and the continuous query maintains a count for each `sessionId` it observes. The `sessionId` attribute is evolving over time and `sessionId` values are only active until the session ends, i.e., for a limited period of time. However, the continuous query cannot know about this property of `sessionId` and expects that every `sessionId` value can occur at any point of time. It maintains a count for each observed `sessionId` value. Consequently, the total state size of the query is continuously growing as more and more `sessionId` values are observed. 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/monitoring/application_profiling.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/application_profiling.md b/docs/monitoring/application_profiling.md
index 721bc31..b9cec5b 100644
--- a/docs/monitoring/application_profiling.md
+++ b/docs/monitoring/application_profiling.md
@@ -40,17 +40,17 @@ Java Flight Recorder is a profiling and event collection framework built into th
 is an advanced set of tools that enables efficient and detailed analysis of the extensive of data collected by Java
 Flight Recorder. Example configuration:
 
-~~~
+{% highlight yaml %}
 env.java.opts: "-XX:+UnlockCommercialFeatures -XX:+UnlockDiagnosticVMOptions -XX:+FlightRecorder -XX:+DebugNonSafepoints -XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=${FLINK_LOG_PREFIX}.jfr"
-~~~
+{% endhighlight %}
 
 # Profiling with JITWatch
 
 [JITWatch](https://github.com/AdoptOpenJDK/jitwatch/wiki) is a log analyser and visualizer for the Java HotSpot JIT
 compiler used to inspect inlining decisions, hot methods, bytecode, and assembly. Example configuration:
 
-~~~
+{% highlight yaml %}
 env.java.opts: "-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading -XX:+LogCompilation -XX:LogFile=${FLINK_LOG_PREFIX}.jit -XX:+PrintAssembly"
-~~~
+{% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/monitoring/checkpoint_monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/checkpoint_monitoring.md b/docs/monitoring/checkpoint_monitoring.md
index 25ed3dd..0a66ca5 100644
--- a/docs/monitoring/checkpoint_monitoring.md
+++ b/docs/monitoring/checkpoint_monitoring.md
@@ -68,10 +68,10 @@ The checkpoint history keeps statistics about recently triggered checkpoints, in
 
 You can configure the number of recent checkpoints that are remembered for the history via the following configuration key. The default is `10`.
 
-```sh
+{% highlight yaml %}
 # Number of recent checkpoints that are remembered
 jobmanager.web.checkpoints.history: 15
-```
+{% endhighlight %}
 
 ### Summary Tab
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/monitoring/historyserver.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/historyserver.md b/docs/monitoring/historyserver.md
index 9c68e65..7f8f737 100644
--- a/docs/monitoring/historyserver.md
+++ b/docs/monitoring/historyserver.md
@@ -35,10 +35,10 @@ The HistoryServer allows you to query the status and statistics of completed job
 
 After you have configured the HistoryServer *and* JobManager, you start and stop the HistoryServer via its corresponding startup script:
 
-```sh
+{% highlight shell %}
 # Start or stop the HistoryServer
 bin/historyserver.sh (start|start-foreground|stop)
-```
+{% endhighlight %}
 
 By default, this server binds to `localhost` and listens at port `8082`.
 
@@ -52,22 +52,22 @@ The configuration keys `jobmanager.archive.fs.dir` and `historyserver.archive.fs
 
 The archiving of completed jobs happens on the JobManager, which uploads the archived job information to a file system directory. You can configure the directory to archive completed jobs in `flink-conf.yaml` by setting a directory via `jobmanager.archive.fs.dir`.
 
-```sh
+{% highlight yaml %}
 # Directory to upload completed job information
 jobmanager.archive.fs.dir: hdfs:///completed-jobs
-```
+{% endhighlight %}
 
 **HistoryServer**
 
 The HistoryServer can be configured to monitor a comma-separated list of directories in via `historyserver.archive.fs.dir`. The configured directories are regularly polled for new archives; the polling interval can be configured via `historyserver.archive.fs.refresh-interval`.
 
-```sh
+{% highlight yaml %}
 # Monitor the following directories for completed jobs
 historyserver.archive.fs.dir: hdfs:///completed-jobs
 
 # Refresh every 10 seconds
 historyserver.archive.fs.refresh-interval: 10000
-```
+{% endhighlight %}
 
 The contained archives are downloaded and cached in the local filesystem. The local directory for this is configured via `historyserver.web.tmpdir`.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/monitoring/logging.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/logging.md b/docs/monitoring/logging.md
index b548d41..737ddb9 100644
--- a/docs/monitoring/logging.md
+++ b/docs/monitoring/logging.md
@@ -46,7 +46,7 @@ The configuration file either has to be specified by setting the environment pro
 The `conf` directory contains a `logback.xml` file which can be modified and is used if Flink is started outside of an IDE and with the provided starting scripts.
 The provided `logback.xml` has the following form:
 
-~~~ xml
+{% highlight xml %}
 <configuration>
     <appender name="file" class="ch.qos.logback.core.FileAppender">
         <file>${log.file}</file>
@@ -60,13 +60,13 @@ The provided `logback.xml` has the following form:
         <appender-ref ref="file"/>
     </root>
 </configuration>
-~~~
+{% endhighlight %}
 
 In order to control the logging level of `org.apache.flink.runtime.jobgraph.JobGraph`, for example, one would have to add the following line to the configuration file.
 
-~~~ xml
+{% highlight xml %}
 <logger name="org.apache.flink.runtime.jobgraph.JobGraph" level="DEBUG"/>
-~~~
+{% endhighlight %}
 
 For further information on configuring logback see [LOGback's manual](http://logback.qos.ch/manual/configuration.html).
 
@@ -74,27 +74,27 @@ For further information on configuring logback see [LOGback's manual](http://log
 
 The loggers using slf4j are created by calling
 
-~~~ java
+{% highlight java %}
 import org.slf4j.LoggerFactory
 import org.slf4j.Logger
 
 Logger LOG = LoggerFactory.getLogger(Foobar.class)
-~~~
+{% endhighlight %}
 
 In order to benefit most from slf4j, it is recommended to use its placeholder mechanism.
 Using placeholders allows to avoid unnecessary string constructions in case that the logging level is set so high that the message would not be logged.
 The syntax of placeholders is the following:
 
-~~~ java
+{% highlight java %}
 LOG.info("This message contains {} placeholders. {}", 2, "Yippie");
-~~~
+{% endhighlight %}
 
 Placeholders can also be used in conjunction with exceptions which shall be logged.
 
-~~~ java
+{% highlight java %}
 catch(Exception exception){
 	LOG.error("An {} occurred.", "error", exception);
 }
-~~~
+{% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index d0b98fb..e11b480 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -569,7 +569,7 @@ we will list more settings specific to each reporter.
 
 Example reporter configuration that specifies multiple reporters:
 
-```
+{% highlight yaml %}
 metrics.reporters: my_jmx_reporter,my_other_reporter
 
 metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
@@ -579,7 +579,7 @@ metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.Grap
 metrics.reporter.my_other_reporter.host: 192.168.1.1
 metrics.reporter.my_other_reporter.port: 10000
 
-```
+{% endhighlight %}
 
 **Important:** The jar containing the reporter must be accessible when Flink is started by placing it in the /lib folder.
 
@@ -1423,7 +1423,7 @@ Request a list of available metrics:
 
 `GET /jobmanager/metrics`
 
-~~~
+{% highlight json %}
 [
   {
     "id": "metric1"
@@ -1432,13 +1432,13 @@ Request a list of available metrics:
     "id": "metric2"
   }
 ]
-~~~
+{% endhighlight %}
 
 Request the values for specific (unaggregated) metrics:
 
 `GET taskmanagers/ABCDE/metrics?get=metric1,metric2`
 
-~~~
+{% highlight json %}
 [
   {
     "id": "metric1",
@@ -1449,13 +1449,13 @@ Request the values for specific (unaggregated) metrics:
     "value": "2"
   }
 ]
-~~~
+{% endhighlight %}
 
 Request aggregated values for specific metrics:
 
 `GET /taskmanagers/metrics?get=metric1,metric2`
 
-~~~
+{% highlight json %}
 [
   {
     "id": "metric1",
@@ -1472,13 +1472,13 @@ Request aggregated values for specific metrics:
     "sum": 16
   }
 ]
-~~~
+{% endhighlight %}
 
 Request specific aggregated values for specific metrics:
 
 `GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max`
 
-~~~
+{% highlight json %}
 [
   {
     "id": "metric1",
@@ -1491,7 +1491,7 @@ Request specific aggregated values for specific metrics:
     "max": 14,
   }
 ]
-~~~
+{% endhighlight %}
 
 ## Dashboard integration
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/monitoring/rest_api.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/rest_api.md b/docs/monitoring/rest_api.md
index c160ad0..ae4f38b 100644
--- a/docs/monitoring/rest_api.md
+++ b/docs/monitoring/rest_api.md
@@ -85,7 +85,7 @@ Some information about the monitoring API and the server setup.
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "refresh-interval": 3000,
   "timezone-offset": 3600000,
@@ -93,7 +93,7 @@ Sample Result:
   "flink-version": "{{ site.version }}",
   "flink-revision": "8124545 @ 16.09.2015 @ 15:38:42 CEST"
 }
-~~~
+{% endhighlight %}
 
 **`/overview`**
 
@@ -101,7 +101,7 @@ Simple summary of the Flink cluster status.
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "taskmanagers": 17,
   "slots-total": 68,
@@ -111,7 +111,7 @@ Sample Result:
   "jobs-cancelled": 1,
   "jobs-failed": 0
 }
-~~~
+{% endhighlight %}
 
 ### Overview of Jobs
 
@@ -121,7 +121,7 @@ Jobs, grouped by status, each with a small summary of its status.
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "jobs":[
     {
@@ -148,7 +148,7 @@ Sample Result:
       ...
     }]
 }
-~~~
+{% endhighlight %}
 
 ### Details of a Running or Completed Job
 
@@ -158,7 +158,7 @@ Summary of one job, listing dataflow plan, status, timestamps of state transitio
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "jid": "ab78dcdbb1db025539e30217ec54ee16",
   "name": "WordCount Example",
@@ -201,7 +201,7 @@ Sample Result:
     // see plan details below
   }
 }
-~~~
+{% endhighlight %}
 
 **`/jobs/<jobid>/vertices`**
 
@@ -214,7 +214,7 @@ The user-defined execution config used by the job.
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "jid": "ab78dcdbb1db025539e30217ec54ee16",
   "name": "WordCount Example",
@@ -225,7 +225,7 @@ Sample Result:
     "object-reuse-mode": false
   }
 }
-~~~
+{% endhighlight %}
 
 **`/jobs/<jobid>/exceptions`**
 
@@ -234,7 +234,7 @@ The `truncated` flag defines whether more exceptions occurred, but are not liste
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "root-exception": "java.io.IOException: File already exists:/tmp/abzs/2\n\tat org.apache.flink.core.fs.local.LocalFileSystem. ...",
   "all-exceptions": [ {
@@ -248,7 +248,7 @@ Sample Result:
   } ],
   "truncated":false
 }
-~~~
+{% endhighlight %}
 
 **`/jobs/<jobid>/accumulators`**
 
@@ -256,7 +256,7 @@ The aggregated user accumulators plus job accumulators.
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "job-accumulators":[],
   "user-task-accumulators": [ {
@@ -270,7 +270,7 @@ Sample Result:
     "value": "LongCounter 37500000"
   } ]
 }
-~~~
+{% endhighlight %}
 
 **`/jobs/<jobid>/vertices/<vertexid>`**
 
@@ -278,7 +278,7 @@ Information about one specific vertex, with a summary for each of its subtasks.
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "id": "dceafe2df1f57a1206fcb907cb38ad97",
   "name": "CHAIN DataSource -> Map -> FlatMap -> Combine(SUM(1))",
@@ -308,7 +308,7 @@ Sample Result:
     }
   } ]
 }
-~~~
+{% endhighlight %}
 
 **`/jobs/<jobid>/vertices/<vertexid>/subtasktimes`**
 
@@ -317,7 +317,7 @@ These can be used, for example, to create time-line comparisons between subtasks
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "id": "dceafe2df1f57a1206fcb907cb38ad97",
   "name": "CHAIN DataSource -> Map -> Combine(SUM(1))",
@@ -338,7 +338,7 @@ Sample Result:
     }
   } ]
 }
-~~~
+{% endhighlight %}
 
 **`/jobs/<jobid>/vertices/<vertexid>/taskmanagers`**
 
@@ -346,7 +346,7 @@ TaskManager statistics for one specific vertex. This is an aggregation of subtas
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "id": "fe20bcc29b87cdc76589ca42114c2499",
   "name": "Reduce (SUM(1), at main(WordCount.java:72)",
@@ -377,7 +377,7 @@ Sample Result:
     }
   } ]
 }
-~~~
+{% endhighlight %}
 
 **`/jobs/<jobid>/vertices/<vertexid>/accumulators`**
 
@@ -385,7 +385,7 @@ The aggregated user-defined accumulators, for a specific vertex.
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "id": "dceafe2df1f57a1206fcb907cb38ad97",
   "user-accumulators": [ {
@@ -394,7 +394,7 @@ Sample Result:
     "name": "genwords", "type": "LongCounter", "value": "LongCounter 75000000"
   } ]
 }
-~~~
+{% endhighlight %}
 
 **`/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators`**
 
@@ -403,7 +403,7 @@ request `/jobs/<jobid>/vertices/<vertexid>/accumulators`.
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "id": "dceafe2df1f57a1206fcb907cb38ad97",
   "parallelism": 2,
@@ -427,7 +427,7 @@ Sample Result:
     } ]
   } ]
 }
-~~~
+{% endhighlight %}
 
 **`/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>`**
 
@@ -440,7 +440,7 @@ Summary of a specific execution attempt of a specific subtask. Multiple executio
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "subtask": 0,
   "status": "FINISHED",
@@ -453,7 +453,7 @@ Sample Result:
     "read-bytes": 0, "write-bytes": 12684375, "read-records": 0, "write-records": 1153125
   }
 }
-~~~
+{% endhighlight %}
 
 **`/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators`**
 
@@ -461,7 +461,7 @@ The accumulators collected for one specific subtask during one specific executio
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "subtask": 0,
   "attempt": 0,
@@ -474,7 +474,7 @@ Sample Result:
   "name": "avglen", "type": "DoubleCounter", "value": "DoubleCounter 102.527162"
   } ]
 }
-~~~
+{% endhighlight %}
 
 **`/jobs/<jobid>/plan`**
 
@@ -482,7 +482,7 @@ The dataflow plan of a job. The plan is also included in the job summary (`/jobs
 
 Sample Result:
 
-~~~
+{% highlight json %}
 {
   "jid":"ab78dcdbb1db025539e30217ec54ee16",
   "name":"WordCount Example",
@@ -558,7 +558,7 @@ Sample Result:
     }
   } ]
 }
-~~~
+{% endhighlight %}
 
 ### Job Cancellation
 
@@ -580,54 +580,54 @@ Since savepoints can take some time to complete this operation happens asynchron
 
 Sample Trigger Result:
 
-~~~
+{% highlight json %}
 {
   "status": "accepted",
   "request-id": 1,
   "location": "/jobs/:jobid/cancel-with-savepoint/in-progress/1"
 }
-~~~
+{% endhighlight %}
 
 ##### Monitoring Progress
 
 The progress of the cancellation has to be monitored by the user at
 
-~~~
+{% highlight json %}
 /jobs/:jobid/cancel-with-savepoint/in-progress/:requestId
-~~~
+{% endhighlight %}
 
 The request ID is returned by the trigger result.
 
 ###### In-Progress
 
-~~~
+{% highlight json %}
 {
   "status": "in-progress",
   "request-id": 1
 }
-~~~
+{% endhighlight %}
 
 ###### Success
 
-~~~
+{% highlight json %}
 {
   "status": "success",
   "request-id": 1,
   "savepoint-path": "<savepointPath>"
 }
-~~~
+{% endhighlight %}
 
 The `savepointPath` points to the external path of the savepoint, which can be used to resume the savepoint.
 
 ###### Failed
 
-~~~
+{% highlight json %}
 {
   "status": "failed",
   "request-id": 1,
   "cause": "<error message>"
 }
-~~~
+{% endhighlight %}
 
 ### Submitting Programs
 
@@ -640,11 +640,11 @@ Also make sure that the multi-part data includes the `Content-Type` of the file
 
 The multi-part payload should start like
 
-```
+{% highlight plain %}
 ------BoundaryXXXX
 Content-Disposition: form-data; name="jarfile"; filename="YourFileName.jar"
 Content-Type: application/x-java-archive
-```
+{% endhighlight %}
 
 #### Run a Program (POST)
 
@@ -664,15 +664,15 @@ If the call succeeds, you will get a response with the ID of the submitted job.
 
 Request:
 
-~~~
+{% highlight bash %}
 POST: /jars/MyProgram.jar/run?savepointPath=/my-savepoints/savepoint-1bae02a80464&allowNonRestoredState=true
-~~~
+{% endhighlight %}
 
 Response:
 
-~~~
+{% highlight json %}
 {"jobid": "869a9868d49c679e7355700e0857af85"}
-~~~
+{% endhighlight %}
 
 ### Dispatcher
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/ops/cli.md
----------------------------------------------------------------------
diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index 45fe799..dd06ac2 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -208,7 +208,7 @@ Otherwise, you will run into a `ClassNotFoundException`.
 
 The command line syntax is as follows:
 
-~~~
+{% highlight bash %}
 ./flink <ACTION> [OPTIONS] [ARGUMENTS]
 
 The following actions are available:
@@ -364,6 +364,6 @@ Action "savepoint" triggers savepoints for a running job or disposes existing on
                                    in the configuration.
   Options for yarn-cluster mode:
      -yid,--yarnapplicationId <arg>   Attach to running YARN session
-~~~
+{% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index d5d81f5..9030bf7 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -443,9 +443,9 @@ active at the same time. Since the *intra-node-parallelism* is typically the num
 more than 4 repartitioning or broadcasting channels are rarely active in parallel, it frequently
 boils down to
 
-```
+{% highlight plain %}
 #slots-per-TM^2 * #TMs * 4
-```
+{% endhighlight %}
 
 Where `#slots per TM` are the [number of slots per TaskManager](#configuring-taskmanager-processing-slots) and `#TMs` are the total number of task managers.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/ops/deployment/aws.md
----------------------------------------------------------------------
diff --git a/docs/ops/deployment/aws.md b/docs/ops/deployment/aws.md
index 7ef95e7..db69b2e 100644
--- a/docs/ops/deployment/aws.md
+++ b/docs/ops/deployment/aws.md
@@ -58,9 +58,9 @@ After creating your cluster, you can [connect to the master node](http://docs.aw
 1. Go the [Downloads Page]({{ site.download_url }}) and **download a binary version of Flink matching the Hadoop version** of your EMR cluster, e.g. Hadoop 2.7 for EMR releases 4.3.0, 4.4.0, or 4.5.0.
 2. Extract the Flink distribution and you are ready to deploy [Flink jobs via YARN](yarn_setup.html) after **setting the Hadoop config directory**:
 
-```bash
+{% highlight bash %}
 HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 examples/streaming/WordCount.jar
-```
+{% endhighlight %}
 
 {% top %}
 
@@ -70,13 +70,13 @@ HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 examples/
 
 You can use S3 objects like regular files by specifying paths in the following format:
 
-```
+{% highlight plain %}
 s3://<your-bucket>/<endpoint>
-```
+{% endhighlight %}
 
 The endpoint can either be a single file or a directory, for example:
 
-```java
+{% highlight java %}
 // Read from S3 bucket
 env.readTextFile("s3://<bucket>/<endpoint>");
 
@@ -85,7 +85,7 @@ stream.writeAsText("s3://<bucket>/<endpoint>");
 
 // Use S3 as FsStatebackend
 env.setStateBackend(new FsStateBackend("s3://<your-bucket>/<endpoint>"));
-```
+{% endhighlight %}
 
 Note that these examples are *not* exhaustive and you can use S3 in other places as well, including your [high availability setup](../jobmanager_high_availability.html) or the [RocksDBStateBackend]({{ site.baseurl }}/ops/state/state_backends.html#the-rocksdbstatebackend); everywhere that Flink expects a FileSystem URI.
 
@@ -101,9 +101,9 @@ implementation. Both ways are described below.
 To use either `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the respective JAR file from the
 `opt` directory to the `lib` directory of your Flink distribution before starting Flink, e.g.
 
-```
+{% highlight bash %}
 cp ./opt/flink-s3-fs-presto-{{ site.version }}.jar ./lib/
-```
+{% endhighlight %}
 
 #### Configure Access Credentials
 
@@ -121,10 +121,10 @@ Access to S3 can be granted via your **access and secret key pair**. Please note
 
 You need to configure both `s3.access-key` and `s3.secret-key`  in Flink's  `flink-conf.yaml`:
 
-```
+{% highlight yaml %}
 s3.access-key: your-access-key
 s3.secret-key: your-secret-key
-```
+{% endhighlight %}
 
 {% top %}
 
@@ -149,7 +149,7 @@ This is the recommended S3 FileSystem implementation to use. It uses Amazon's SD
 
 You need to point Flink to a valid Hadoop configuration, which contains the following properties in `core-site.xml`:
 
-```xml
+{% highlight xml %}
 <configuration>
 
 <property>
@@ -165,7 +165,7 @@ You need to point Flink to a valid Hadoop configuration, which contains the foll
 </property>
 
 </configuration>
-```
+{% endhighlight %}
 
 This registers `S3AFileSystem` as the default FileSystem for URIs with the `s3a://` scheme.
 
@@ -175,12 +175,12 @@ This file system is limited to files up to 5GB in size and it does not work with
 
 You need to point Flink to a valid Hadoop configuration, which contains the following property in `core-site.xml`:
 
-```xml
+{% highlight xml %}
 <property>
   <name>fs.s3.impl</name>
   <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
 </property>
-```
+{% endhighlight %}
 
 This registers `NativeS3FileSystem` as the default FileSystem for URIs with the `s3://` scheme.
 
@@ -192,9 +192,9 @@ You can specify the [Hadoop configuration](../config.html#hdfs) in various ways
 the path of the Hadoop configuration directory, for example
 - by setting the environment variable `HADOOP_CONF_DIR`, or
 - by setting the `fs.hdfs.hadoopconf` configuration option in `flink-conf.yaml`:
-```
+{% highlight yaml %}
 fs.hdfs.hadoopconf: /path/to/etc/hadoop
-```
+{% endhighlight %}
 
 This registers `/path/to/etc/hadoop` as Hadoop's configuration directory with Flink. Flink will look for the `core-site.xml` and `hdfs-site.xml` files in the specified directory.
 
@@ -222,7 +222,7 @@ Access to S3 can be granted via your **access and secret key pair**. Please note
 
 For `S3AFileSystem` you need to configure both `fs.s3a.access.key` and `fs.s3a.secret.key`  in Hadoop's  `core-site.xml`:
 
-```xml
+{% highlight xml %}
 <property>
   <name>fs.s3a.access.key</name>
   <value></value>
@@ -232,7 +232,7 @@ For `S3AFileSystem` you need to configure both `fs.s3a.access.key` and `fs.s3a.s
   <name>fs.s3a.secret.key</name>
   <value></value>
 </property>
-```
+{% endhighlight %}
 
 {% top %}
 
@@ -242,7 +242,7 @@ Access to S3 can be granted via your **access and secret key pair**. But this is
 
 For `NativeS3FileSystem` you need to configure both `fs.s3.awsAccessKeyId` and `fs.s3.awsSecretAccessKey`  in Hadoop's  `core-site.xml`:
 
-```xml
+{% highlight xml %}
 <property>
   <name>fs.s3.awsAccessKeyId</name>
   <value></value>
@@ -252,7 +252,7 @@ For `NativeS3FileSystem` you need to configure both `fs.s3.awsAccessKeyId` and `
   <name>fs.s3.awsSecretAccessKey</name>
   <value></value>
 </property>
-```
+{% endhighlight %}
 
 {% top %}
 
@@ -320,7 +320,7 @@ The following sections lists common issues when working with Flink on AWS.
 
 If your job submission fails with an Exception message noting that `No file system found with scheme s3` this means that no FileSystem has been configured for S3. Please check out the configuration sections for our [shaded Hadoop/Presto](#shaded-hadooppresto-s3-file-systems-recommended) or [generic Hadoop](#set-s3-filesystem) file systems for details on how to configure this properly.
 
-```
+{% highlight plain %}
 org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
   Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
 Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error:
@@ -332,7 +332,7 @@ Caused by: java.io.IOException: No file system found with scheme s3,
     at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
     at o.a.f.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
     at o.a.f.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
-```
+{% endhighlight %}
 
 {% top %}
 
@@ -340,7 +340,7 @@ Caused by: java.io.IOException: No file system found with scheme s3,
 
 If you see your job failing with an Exception noting that the `AWS Access Key ID and Secret Access Key must be specified as the username or password`, your access credentials have not been set up properly. Please refer to the access credential section for our [shaded Hadoop/Presto](#configure-access-credentials) or [generic Hadoop](#configure-access-credentials-1) file systems for details on how to configure this.
 
-```
+{% highlight plain %}
 org.apache.flink.client.program.ProgramInvocationException: The program execution failed:
   Failed to submit job cd927567a81b62d7da4c18eaa91c3c39 (WordCount Example) [...]
 Caused by: java.io.IOException: The given file URI (s3://<bucket>/<endpoint>) points to the
@@ -362,7 +362,7 @@ Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Acce
     at o.a.h.fs.s3native.$Proxy6.initialize(Unknown Source)
     at o.a.h.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:330)
     at o.a.f.runtime.fs.hdfs.HadoopFileSystem.initialize(HadoopFileSystem.java:321)
-```
+{% endhighlight %}
 
 {% top %}
 
@@ -370,7 +370,7 @@ Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Acce
 
 If you see this Exception, the S3 FileSystem is not part of the class path of Flink. Please refer to [S3 FileSystem dependency section](#provide-s3-filesystem-dependency) for details on how to configure this properly.
 
-```
+{% highlight plain %}
 Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
   at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2186)
   at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
@@ -389,7 +389,7 @@ Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native
   at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2060)
   at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2152)
   ... 33 more
-```
+{% endhighlight %}
 
 {% top %}
 
@@ -397,16 +397,16 @@ Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native
 
 If you have configured everything properly, but get a `Bad Request` Exception **and** your S3 bucket is located in region `eu-central-1`, you might be running an S3 client, which does not support [Amazon's signature version 4](http://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html).
 
-```
+{% highlight plain %}
 [...]
 Caused by: java.io.IOException: s3://<bucket-in-eu-central-1>/<endpoint> : 400 : Bad Request [...]
 Caused by: org.jets3t.service.impl.rest.HttpException [...]
-```
+{% endhighlight %}
 or
-```
+{% highlight plain %}
 com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: [...], AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: [...]
 
-```
+{% endhighlight %}
 
 This should not apply to our shaded Hadoop/Presto S3 file systems but can occur for Hadoop-provided
 S3 file systems. In particular, all Hadoop versions up to 2.7.2 running `NativeS3FileSystem` (which
@@ -417,9 +417,9 @@ Except for changing the bucket region, you may also be able to solve this by
 [requesting signature version 4 for request authentication](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingAWSSDK.html#specify-signature-version),
 e.g. by adding this to Flink's JVM options in `flink-conf.yaml` (see
 [configuration](../config.html#common-options)):
-```
+{% highlight yaml %}
 env.java.opts: -Dcom.amazonaws.services.s3.enableV4
-```
+{% endhighlight %}
 
 {% top %}
 
@@ -427,7 +427,7 @@ env.java.opts: -Dcom.amazonaws.services.s3.enableV4
 
 This Exception is usually caused by skipping the local buffer directory configuration `fs.s3a.buffer.dir` for the `S3AFileSystem`. Please refer to the [S3AFileSystem configuration](#s3afilesystem-recommended) section to see how to configure the `S3AFileSystem` properly.
 
-```
+{% highlight plain %}
 [...]
 Caused by: java.lang.NullPointerException at
 o.a.h.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) at
@@ -442,6 +442,6 @@ o.a.h.fs.FileSystem.create(FileSystem.java:785) at
 o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) at
 o.a.f.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) at
 ... 25 more
-```
+{% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/ops/deployment/cluster_setup.md
----------------------------------------------------------------------
diff --git a/docs/ops/deployment/cluster_setup.md b/docs/ops/deployment/cluster_setup.md
index c8a5d0b..75d0c2e 100644
--- a/docs/ops/deployment/cluster_setup.md
+++ b/docs/ops/deployment/cluster_setup.md
@@ -59,10 +59,10 @@ Go to the [downloads page]({{ site.download_url }}) and get the ready-to-run pac
 
 After downloading the latest release, copy the archive to your master node and extract it:
 
-~~~bash
+{% highlight bash %}
 tar xzf flink-*.tgz
 cd flink-*
-~~~
+{% endhighlight %}
 
 ### Configuring Flink
 
@@ -122,9 +122,9 @@ The following script starts a JobManager on the local node and connects via SSH
 
 Assuming that you are on the master node and inside the Flink directory:
 
-~~~bash
+{% highlight bash %}
 bin/start-cluster.sh
-~~~
+{% endhighlight %}
 
 To stop Flink, there is also a `stop-cluster.sh` script.
 
@@ -136,15 +136,15 @@ You can add both JobManager and TaskManager instances to your running cluster wi
 
 #### Adding a JobManager
 
-~~~bash
+{% highlight bash %}
 bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
-~~~
+{% endhighlight %}
 
 #### Adding a TaskManager
 
-~~~bash
+{% highlight bash %}
 bin/taskmanager.sh start|start-foreground|stop|stop-all
-~~~
+{% endhighlight %}
 
 Make sure to call these scripts on the hosts on which you want to start/stop the respective instance.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/ops/deployment/hadoop.md
----------------------------------------------------------------------
diff --git a/docs/ops/deployment/hadoop.md b/docs/ops/deployment/hadoop.md
index cd3c096..f2f060c 100644
--- a/docs/ops/deployment/hadoop.md
+++ b/docs/ops/deployment/hadoop.md
@@ -40,8 +40,8 @@ running inside YARN will be started with the Hadoop classpaths, but it can
 happen that the Hadoop dependencies must be in the classpath when submitting a
 job to YARN. For this, it's usually enough to do a
 
-```
+{% highlight bash %}
 export HADOOP_CLASSPATH=`hadoop classpath`
-```
+{% endhighlight %}
 
 in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is an argument that will make it print the configured Hadoop classpath.

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/ops/deployment/mapr_setup.md
----------------------------------------------------------------------
diff --git a/docs/ops/deployment/mapr_setup.md b/docs/ops/deployment/mapr_setup.md
index 19920ad..37b192f 100644
--- a/docs/ops/deployment/mapr_setup.md
+++ b/docs/ops/deployment/mapr_setup.md
@@ -41,11 +41,11 @@ In order to run Flink on MapR, Flink needs to be built with MapR's own
 Hadoop and Zookeeper distribution. Simply build Flink using Maven with
 the following command from the project root directory:
 
-```
+{% highlight bash %}
 mvn clean install -DskipTests -Pvendor-repos,mapr \
     -Dhadoop.version=2.7.0-mapr-1607 \
     -Dzookeeper.version=3.4.5-mapr-1604
-```
+{% endhighlight %}
 
 The `vendor-repos` build profile adds MapR's repository to the build so that
 MapR's Hadoop / Zookeeper dependencies can be fetched. The `mapr` build
@@ -67,13 +67,13 @@ The client submitting Flink jobs to MapR also needs to be prepared with the belo
 
 Ensure that MapR's JAAS config file is picked up to avoid login failures:
 
-```
+{% highlight bash %}
 export JVM_ARGS=-Djava.security.auth.login.config=/opt/mapr/conf/mapr.login.conf
-```
+{% endhighlight %}
 
 Make sure that the `yarn.nodemanager.resource.cpu-vcores` property is set in `yarn-site.xml`:
 
-~~~xml
+{% highlight xml %}
 <!-- in /opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop/yarn-site.xml -->
 
 <configuration>
@@ -86,28 +86,28 @@ Make sure that the `yarn.nodemanager.resource.cpu-vcores` property is set in `ya
 
 ...
 </configuration>
-~~~
+{% endhighlight %}
 
 Also remember to set the `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment
 variables to the path where `yarn-site.xml` is located:
 
-```
+{% highlight bash %}
 export YARN_CONF_DIR=/opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop/
 export HADOOP_CONF_DIR=/opt/mapr/hadoop/hadoop-2.7.0/etc/hadoop/
-```
+{% endhighlight %}
 
 Make sure that the MapR native libraries are picked up in the classpath:
 
-```
+{% highlight bash %}
 export FLINK_CLASSPATH=/opt/mapr/lib/*
-```
+{% endhighlight %}
 
 If you'll be starting Flink on YARN sessions with `yarn-session.sh`, the
 below is also required:
 
-```
+{% highlight bash %}
 export CC_CLASSPATH=/opt/mapr/lib/*
-```
+{% endhighlight %}
 
 ## Running Flink with a Secured MapR Cluster
 
@@ -125,10 +125,10 @@ Users simply need to login by using MapR's `maprlogin` authentication
 utility. Users that haven't acquired MapR login credentials would not be
 able to submit Flink jobs, erroring with:
 
-```
+{% highlight plain %}
 java.lang.Exception: unable to establish the security context
 Caused by: o.a.f.r.security.modules.SecurityModule$SecurityInstallException: Unable to set the Hadoop login user
 Caused by: java.io.IOException: failure to login: Unable to obtain MapR credentials
-```
+{% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/ops/deployment/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/ops/deployment/yarn_setup.md b/docs/ops/deployment/yarn_setup.md
index 0fb5bf6..d2fdad9 100644
--- a/docs/ops/deployment/yarn_setup.md
+++ b/docs/ops/deployment/yarn_setup.md
@@ -32,14 +32,14 @@ under the License.
 
 Start a YARN session with 4 Task Managers (each with 4 GB of Heapspace):
 
-~~~bash
+{% highlight bash %}
 # get the hadoop2 package from the Flink download page at
 # {{ site.download_url }}
 curl -O <flink_hadoop2_download_url>
 tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
 cd flink-{{ site.version }}/
 ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
-~~~
+{% endhighlight %}
 
 Specify the `-s` flag for the number of processing slots per Task Manager. We recommend to set the number of slots to the number of processors per machine.
 
@@ -47,14 +47,14 @@ Once the session has been started, you can submit jobs to the cluster using the
 
 ### Run a Flink job on YARN
 
-~~~bash
+{% highlight bash %}
 # get the hadoop2 package from the Flink download page at
 # {{ site.download_url }}
 curl -O <flink_hadoop2_download_url>
 tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
 cd flink-{{ site.version }}/
 ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar
-~~~
+{% endhighlight %}
 
 ## Flink YARN Session
 
@@ -79,22 +79,22 @@ Download a Flink package for Hadoop >= 2 from the [download page]({{ site.downlo
 
 Extract the package using:
 
-~~~bash
+{% highlight bash %}
 tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
 cd flink-{{site.version }}/
-~~~
+{% endhighlight %}
 
 #### Start a Session
 
 Use the following command to start a session
 
-~~~bash
+{% highlight bash %}
 ./bin/yarn-session.sh
-~~~
+{% endhighlight %}
 
 This command will show you the following overview:
 
-~~~bash
+{% highlight bash %}
 Usage:
    Required
      -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
@@ -108,15 +108,15 @@ Usage:
      -s,--slots <arg>                Number of slots per TaskManager
      -tm,--taskManagerMemory <arg>   Memory per TaskManager Container [in MB]
      -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for HA mode
-~~~
+{% endhighlight %}
 
 Please note that the Client requires the `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment variable to be set to read the YARN and HDFS configuration.
 
 **Example:** Issue the following command to allocate 10 Task Managers, with 8 GB of memory and 32 processing slots each:
 
-~~~bash
+{% highlight bash %}
 ./bin/yarn-session.sh -n 10 -tm 8192 -s 32
-~~~
+{% endhighlight %}
 
 The system will use the configuration in `conf/flink-conf.yaml`. Please follow our [configuration guide]({{ site.baseurl }}/ops/config.html) if you want to change something.
 
@@ -148,25 +148,25 @@ Use the YARN utilities (`yarn application -kill <appId>`) to stop the YARN sessi
 
 Use the following command to start a session
 
-~~~bash
+{% highlight bash %}
 ./bin/yarn-session.sh
-~~~
+{% endhighlight %}
 
 This command will show you the following overview:
 
-~~~bash
+{% highlight bash %}
 Usage:
    Required
      -id,--applicationId <yarnAppId> YARN application Id
-~~~
+{% endhighlight %}
 
 As already mentioned, `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment variable must be set to read the YARN and HDFS configuration.
 
 **Example:** Issue the following command to attach to running Flink YARN session `application_1463870264508_0029`:
 
-~~~bash
+{% highlight bash %}
 ./bin/yarn-session.sh -id application_1463870264508_0029
-~~~
+{% endhighlight %}
 
 Attaching to a running session uses YARN ResourceManager to determine Job Manager RPC port.
 
@@ -176,15 +176,15 @@ Stop the YARN session by stopping the unix process (using CTRL+C) or by entering
 
 Use the following command to submit a Flink program to the YARN cluster:
 
-~~~bash
+{% highlight bash %}
 ./bin/flink
-~~~
+{% endhighlight %}
 
 Please refer to the documentation of the [command-line client]({{ site.baseurl }}/ops/cli.html).
 
 The command will show you a help menu like this:
 
-~~~bash
+{% highlight bash %}
 [...]
 Action "run" compiles and runs a program.
 
@@ -202,25 +202,25 @@ Action "run" compiles and runs a program.
                                       program. Optional flag to override the
                                       default value specified in the
                                       configuration
-~~~
+{% endhighlight %}
 
 Use the *run* action to submit a job to YARN. The client is able to determine the address of the JobManager. In the rare event of a problem, you can also pass the JobManager address using the `-m` argument. The JobManager address is visible in the YARN console.
 
 **Example**
 
-~~~bash
+{% highlight bash %}
 wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
 hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
 ./bin/flink run ./examples/batch/WordCount.jar \
         hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt
-~~~
+{% endhighlight %}
 
 If there is the following error, make sure that all TaskManagers started:
 
-~~~bash
+{% highlight bash %}
 Exception in thread "main" org.apache.flink.compiler.CompilerException:
     Available instances could not be determined from job manager: Connection timed out.
-~~~
+{% endhighlight %}
 
 You can check the number of TaskManagers in the JobManager web interface. The address of this interface is printed in the YARN session console.
 
@@ -235,9 +235,9 @@ Please note that the client then expects the `-yn` value to be set (number of Ta
 
 ***Example:***
 
-~~~bash
+{% highlight bash %}
 ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
-~~~
+{% endhighlight %}
 
 The command line options of the YARN session are also available with the `./bin/flink` tool. They are prefixed with a `y` or `yarn` (for the long argument options).
 
@@ -275,9 +275,9 @@ In cases where the Flink YARN session fails during the deployment itself, users
 To enable it, users have to set the `yarn.log-aggregation-enable` property to `true` in the `yarn-site.xml` file.
 Once that is enabled, users can use the following command to retrieve all log files of a (failed) YARN session.
 
-~~~
+{% highlight bash %}
 yarn logs -applicationId <application ID>
-~~~
+{% endhighlight %}
 
 Note that it takes a few seconds after the session has finished until the logs show up.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/ops/filesystems.md
----------------------------------------------------------------------
diff --git a/docs/ops/filesystems.md b/docs/ops/filesystems.md
index 75ddf6d..50e3d24 100644
--- a/docs/ops/filesystems.md
+++ b/docs/ops/filesystems.md
@@ -79,9 +79,9 @@ The following configuration settings exist across different file systems
 
 If paths to files do not explicitly specify a file system scheme (and authority), a default scheme (and authority) will be used.
 
-~~~
+{% highlight yaml %}
 fs.default-scheme: <default-fs>
-~~~
+{% endhighlight %}
 
 For example, if the default file system configured as `fs.default-scheme: hdfs://localhost:9000/`, then a a file path of
 `/user/hugo/in.txt'` is interpreted as `hdfs://localhost:9000/user/hugo/in.txt'`
@@ -96,13 +96,13 @@ For example, very small HDFS clusters with few RPC handlers can sometimes be ove
 To limit a specific file system's connections, add the following entries to the Flink configuration. The file system to be limited is identified by
 its scheme.
 
-~~~
+{% highlight yaml %}
 fs.<scheme>.limit.total: (number, 0/-1 mean no limit)
 fs.<scheme>.limit.input: (number, 0/-1 mean no limit)
 fs.<scheme>.limit.output: (number, 0/-1 mean no limit)
 fs.<scheme>.limit.timeout: (milliseconds, 0 means infinite)
 fs.<scheme>.limit.stream-timeout: (milliseconds, 0 means infinite)
-~~~
+{% endhighlight %}
 
 You can limit the number if input/output connections (streams) separately (`fs.<scheme>.limit.input` and `fs.<scheme>.limit.output`), as well as impose a limit on
 the total number of concurrent streams (`fs.<scheme>.limit.total`). If the file system tries to open more streams, the operation will block until some streams are closed.

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/ops/security-ssl.md
----------------------------------------------------------------------
diff --git a/docs/ops/security-ssl.md b/docs/ops/security-ssl.md
index 961af89..ceb641b 100644
--- a/docs/ops/security-ssl.md
+++ b/docs/ops/security-ssl.md
@@ -41,31 +41,31 @@ You need to have a Java Keystore generated and copied to each node in the flink
 
 Execute the following keytool commands to create a truststore with a self signed CA
 
-~~~
+{% highlight bash %}
 keytool -genkeypair -alias ca -keystore ca.keystore -dname "CN=Sample CA" -storepass password -keypass password -keyalg RSA -ext bc=ca:true
 keytool -keystore ca.keystore -storepass password -alias ca -exportcert > ca.cer
 keytool -importcert -keystore ca.truststore -alias ca -storepass password -noprompt -file ca.cer
-~~~
+{% endhighlight %}
 
 Now create keystores for each node with certificates signed by the above CA. Let node1.company.org and node2.company.org be the hostnames with IPs 192.168.1.1 and 192.168.1.2 respectively
 
 #### Node 1
-~~~
+{% highlight bash %}
 keytool -genkeypair -alias node1 -keystore node1.keystore -dname "CN=node1.company.org" -ext SAN=dns:node1.company.org,ip:192.168.1.1 -storepass password -keypass password -keyalg RSA
 keytool -certreq -keystore node1.keystore -storepass password -alias node1 -file node1.csr
 keytool -gencert -keystore ca.keystore -storepass password -alias ca -ext SAN=dns:node1.company.org,ip:192.168.1.1 -infile node1.csr -outfile node1.cer
 keytool -importcert -keystore node1.keystore -storepass password -file ca.cer -alias ca -noprompt
 keytool -importcert -keystore node1.keystore -storepass password -file node1.cer -alias node1 -noprompt
-~~~
+{% endhighlight %}
 
 #### Node 2
-~~~
+{% highlight bash %}
 keytool -genkeypair -alias node2 -keystore node2.keystore -dname "CN=node2.company.org" -ext SAN=dns:node2.company.org,ip:192.168.1.2 -storepass password -keypass password -keyalg RSA
 keytool -certreq -keystore node2.keystore -storepass password -alias node2 -file node2.csr
 keytool -gencert -keystore ca.keystore -storepass password -alias ca -ext SAN=dns:node2.company.org,ip:192.168.1.2 -infile node2.csr -outfile node2.cer
 keytool -importcert -keystore node2.keystore -storepass password -file ca.cer -alias ca -noprompt
 keytool -importcert -keystore node2.keystore -storepass password -file node2.cer -alias node2 -noprompt
-~~~
+{% endhighlight %}
 
 ## Standalone Deployment
 Configure each node in the standalone cluster to pick up the keystore and truststore files present in the local file system.
@@ -76,24 +76,24 @@ Configure each node in the standalone cluster to pick up the keystore and trusts
 * Configure conf/flink-conf.yaml to pick up these files
 
 #### Node 1
-~~~
+{% highlight yaml %}
 security.ssl.enabled: true
 security.ssl.keystore: /usr/local/node1.keystore
 security.ssl.keystore-password: abc123
 security.ssl.key-password: abc123
 security.ssl.truststore: /usr/local/ca.truststore
 security.ssl.truststore-password: abc123
-~~~
+{% endhighlight %}
 
 #### Node 2
-~~~
+{% highlight yaml %}
 security.ssl.enabled: true
 security.ssl.keystore: /usr/local/node2.keystore
 security.ssl.keystore-password: abc123
 security.ssl.key-password: abc123
 security.ssl.truststore: /usr/local/ca.truststore
 security.ssl.truststore-password: abc123
-~~~
+{% endhighlight %}
 
 * Restart the flink components to enable SSL for all of flink's internal communication
 * Verify by accessing the jobmanager's UI using https url. The task manager's path in the UI should show akka.ssl.tcp:// as the protocol
@@ -106,14 +106,14 @@ The keystores and truststore can be deployed in a YARN setup in multiple ways de
 The keystores and truststore should be generated and deployed on all nodes in the YARN setup where flink components can potentially be executed. The same flink config file from the flink YARN client is used for all the flink components running in the YARN cluster. Therefore we need to ensure the keystore is deployed and accessible using the same filepath in all the YARN nodes.
 
 #### Example config
-~~~
+{% highlight yaml %}
 security.ssl.enabled: true
 security.ssl.keystore: /usr/local/node.keystore
 security.ssl.keystore-password: abc123
 security.ssl.key-password: abc123
 security.ssl.truststore: /usr/local/ca.truststore
 security.ssl.truststore-password: abc123
-~~~
+{% endhighlight %}
 
 Now you can start the YARN session from the CLI like you would normally do.
 
@@ -125,20 +125,20 @@ We can use the YARN client's ship files option (-yt) to distribute the keystores
 * Copy the keystore and the CA's truststore into a local directory (at the cli's working directory), say deploy-keys/
 * Update the configuration to pick up the files from a relative path
 
-~~~
+{% highlight yaml %}
 security.ssl.enabled: true
 security.ssl.keystore: deploy-keys/node.keystore
 security.ssl.keystore-password: password
 security.ssl.key-password: password
 security.ssl.truststore: deploy-keys/ca.truststore
 security.ssl.truststore-password: password
-~~~
+{% endhighlight %}
 
 * Start the YARN session using the -yt parameter
 
-~~~
+{% highlight bash %}
 flink run -m yarn-cluster -yt deploy-keys/ TestJob.jar
-~~~
+{% endhighlight %}
 
 When deployed using YARN, flink's web dashboard is accessible through YARN proxy's Tracking URL. To ensure that the YARN proxy is able to access flink's https url you need to configure YARN proxy to accept flink's SSL certificates. Add the custom CA certificate into Java's default truststore on the YARN Proxy node.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/ops/state/checkpoints.md
----------------------------------------------------------------------
diff --git a/docs/ops/state/checkpoints.md b/docs/ops/state/checkpoints.md
index d2df4e6..98882c5 100644
--- a/docs/ops/state/checkpoints.md
+++ b/docs/ops/state/checkpoints.md
@@ -45,10 +45,10 @@ write their meta data out to persistent storage and are *not* automatically
 cleaned up when the job fails. This way, you will have a checkpoint around
 to resume from if your job fails.
 
-```java
+{% highlight java %}
 CheckpointConfig config = env.getCheckpointConfig();
 config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-```
+{% endhighlight %}
 
 The `ExternalizedCheckpointCleanup` mode configures what happens with externalized checkpoints when you cancel the job:
 
@@ -64,9 +64,9 @@ files. The **target directory** for the externalized checkpoint's meta data is
 determined from the configuration key `state.checkpoints.dir` which, currently,
 can only be set via the configuration files.
 
-```
+{% highlight yaml %}
 state.checkpoints.dir: hdfs:///checkpoints/
-```
+{% endhighlight %}
 
 This directory will then contain the checkpoint meta data required to restore
 the checkpoint. For the `MemoryStateBackend`, this meta data file will be
@@ -76,9 +76,9 @@ self-contained and no further files are needed.
 and only write the paths to these files into the meta data file. These data
 files are stored at the path given to the state back-end during construction.
 
-```java
+{% highlight java %}
 env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");
-```
+{% endhighlight %}
 
 ### Difference to Savepoints
 
@@ -95,8 +95,8 @@ meta data file is not self-contained, the jobmanager needs to have access to
 the data files it refers to (see [Directory Structure](#directory-structure)
 above).
 
-```sh
+{% highlight shell %}
 $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
-```
+{% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/ops/state/savepoints.md
----------------------------------------------------------------------
diff --git a/docs/ops/state/savepoints.md b/docs/ops/state/savepoints.md
index 1a714b3..6dd5154 100644
--- a/docs/ops/state/savepoints.md
+++ b/docs/ops/state/savepoints.md
@@ -59,12 +59,12 @@ If you don't specify the IDs manually they will be generated automatically. You
 
 You can think of a savepoint as holding a map of `Operator ID -> State` for each stateful operator:
 
-```
+{% highlight plain %}
 Operator ID | State
 ------------+------------------------
 source-id   | State of StatefulSource
 mapper-id   | State of StatefulMapper
-```
+{% endhighlight %}
 
 In the above example, the print sink is stateless and hence not part of the savepoint state. By default, we try to map each entry of the savepoint back to the new program.
 
@@ -84,7 +84,7 @@ When triggering a savepoint, a new savepoint directory is created where the data
 
 For example with a `FsStateBackend` or `RocksDBStateBackend`:
 
-```sh
+{% highlight shell %}
 # Savepoint target directory
 /savepoints/
 
@@ -96,7 +96,7 @@ For example with a `FsStateBackend` or `RocksDBStateBackend`:
 
 # Savepoint state
 /savepoints/savepoint-:shortjobid-:savepointid/...
-```
+{% endhighlight %}
 
 <div class="alert alert-info">
   <strong>Note:</strong>
@@ -108,33 +108,33 @@ Note that if you use the `MemoryStateBackend`, metadata *and* savepoint state wi
 
 #### Trigger a Savepoint
 
-```sh
+{% highlight shell %}
 $ bin/flink savepoint :jobId [:targetDirectory]
-```
+{% endhighlight %}
 
 This will trigger a savepoint for the job with ID `:jobId`, and returns the path of the created savepoint. You need this path to restore and dispose savepoints.
 
 #### Trigger a Savepoint with YARN
 
-```sh
+{% highlight shell %}
 $ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
-```
+{% endhighlight %}
 
 This will trigger a savepoint for the job with ID `:jobId` and YARN application ID `:yarnAppId`, and returns the path of the created savepoint.
 
 #### Cancel Job with Savepoint
 
-```sh
+{% highlight shell %}
 $ bin/flink cancel -s [:targetDirectory] :jobId
-```
+{% endhighlight %}
 
 This will atomically trigger a savepoint for the job with ID `:jobid` and cancel the job. Furthermore, you can specify a target file system directory to store the savepoint in.  The directory needs to be accessible by the JobManager(s) and TaskManager(s).
 
 ### Resuming from Savepoints
 
-```sh
+{% highlight shell %}
 $ bin/flink run -s :savepointPath [:runArgs]
-```
+{% endhighlight %}
 
 This submits a job and specifies a savepoint to resume from. You may give a path to either the savepoint's directory or the `_metadata` file.
 
@@ -142,15 +142,15 @@ This submits a job and specifies a savepoint to resume from. You may give a path
 
 By default the resume operation will try to map all state of the savepoint back to the program you are restoring with. If you dropped an operator, you can allow to skip state that cannot be mapped to the new program via `--allowNonRestoredState` (short: `-n`) option:
 
-```sh
+{% highlight shell %}
 $ bin/flink run -s :savepointPath -n [:runArgs]
-```
+{% endhighlight %}
 
 ### Disposing Savepoints
 
-```sh
+{% highlight shell %}
 $ bin/flink savepoint -d :savepointPath
-```
+{% endhighlight %}
 
 This disposes the savepoint stored in `:savepointPath`.
 
@@ -160,10 +160,10 @@ Note that it is possible to also manually delete a savepoint via regular file sy
 
 You can configure a default savepoint target directory via the `state.savepoints.dir` key. When triggering savepoints, this directory will be used to store the savepoint. You can overwrite the default by specifying a custom target directory with the trigger commands (see the [`:targetDirectory` argument](#trigger-a-savepoint)).
 
-```sh
+{% highlight yaml %}
 # Default savepoint target directory
 state.savepoints.dir: hdfs:///flink/savepoints
-```
+{% endhighlight %}
 
 If you neither configure a default nor specify a custom target directory, triggering the savepoint will fail.
 
@@ -189,9 +189,9 @@ By default, a savepoint restore will try to match all state back to the restored
 
 You can allow non restored state by setting the `--allowNonRestoredState` (short: `-n`) with the run command:
 
-```sh
+{% highlight shell %}
 $ bin/flink run -s :savepointPath -n [:runArgs]
-```
+{% endhighlight %}
 
 ### What happens if I reorder stateful operators in my job?
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/ops/state/state_backends.md
----------------------------------------------------------------------
diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md
index 3410623..c86f62d 100644
--- a/docs/ops/state/state_backends.md
+++ b/docs/ops/state/state_backends.md
@@ -156,7 +156,7 @@ In the case where the default state backend is set to *filesystem*, the entry `s
 
 A sample section in the configuration file could look as follows:
 
-~~~
+{% highlight yaml %}
 # The backend that will be used to store operator state checkpoints
 
 state.backend: filesystem
@@ -165,6 +165,6 @@ state.backend: filesystem
 # Directory for storing checkpoints
 
 state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints
-~~~
+{% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/ops/upgrading.md
----------------------------------------------------------------------
diff --git a/docs/ops/upgrading.md b/docs/ops/upgrading.md
index f5c8ee0..11e469d 100644
--- a/docs/ops/upgrading.md
+++ b/docs/ops/upgrading.md
@@ -36,21 +36,21 @@ The line of action for upgrading a streaming application or migrating an applica
 There are two ways of taking a savepoint from a running streaming application.
 
 * Taking a savepoint and continue processing.
-```
+{% highlight bash %}
 > ./bin/flink savepoint <jobID> [pathToSavepoint]
-```
+{% endhighlight %}
 It is recommended to periodically take savepoints in order to be able to restart an application from a previous point in time.
 
 * Taking a savepoint and stopping the application as a single action. 
-```
+{% highlight bash %}
 > ./bin/flink cancel -s [pathToSavepoint] <jobID>
-```
+{% endhighlight %}
 This means that the application is canceled immediately after the savepoint completed, i.e., no other checkpoints are taken after the savepoint.
 
 Given a savepoint taken from an application, the same or a compatible application (see [Application State Compatibility](#application-state-compatibility) section below) can be started from that savepoint. Starting an application from a savepoint means that the state of its operators is initialized with the operator state persisted in the savepoint. This is done by starting an application using a savepoint.
-```
+{% highlight bash %}
 > ./bin/flink run -d -s [pathToSavepoint] ~/application.jar
-```
+{% endhighlight %}
 
 The operators of the started application are initialized with the operator state of the original application (i.e., the application the savepoint was taken from) at the time when the savepoint was taken. The started application continues processing from exactly this point on. 
 
@@ -66,10 +66,10 @@ In this section, we discuss how applications can be modified to remain state com
 
 When an application is restarted from a savepoint, Flink matches the operator state stored in the savepoint to stateful operators of the started application. The matching is done based on operator IDs, which are also stored in the savepoint. Each operator has a default ID that is derived from the operator's position in the application's operator topology. Hence, an unmodified application can always be restarted from one of its own savepoints. However, the default IDs of operators are likely to change if an application is modified. Therefore, modified applications can only be started from a savepoint if the operator IDs have been explicitly specified. Assigning IDs to operators is very simple and done using the `uid(String)` method as follows:
 
-```
+{% highlight scala%}
 val mappedEvents: DataStream[(Int, Long)] = events
   .map(new MyStatefulMapFunc()).uid(“mapper-1”)
-```
+{% endhighlight %}
 
 **Note:** Since the operator IDs stored in a savepoint and IDs of operators in the application to start must be equal, it is highly recommended to assign unique IDs to all operators of an application that might be upgraded in the future. This advice applies to all operators, i.e., operators with and without explicitly declared operator state, because some operators have internal state that is not visible to the user. Upgrading an application without assigned operator IDs is significantly more difficult and may only be possible via a low-level workaround using the `setUidHash()` method.
 
@@ -171,9 +171,9 @@ savepoints can be relocated using typical filesystem operations..
 First major step in job migration is taking a savepoint of your job running in the older Flink version.
 You can do this with the command:
 
-```sh
+{% highlight shell %}
 $ bin/flink savepoint :jobId [:targetDirectory]
-```
+{% endhighlight %}
 
 For more details, please read the [savepoint documentation]({{ site.baseurl }}/ops/state/savepoints.html).
 
@@ -190,9 +190,9 @@ If you are unfamiliar with installing Flink in your cluster, please read the [de
 As the last step of job migration, you resume from the savepoint taken above on the updated cluster. You can do
 this with the command:
 
-```sh
+{% highlight shell %}
 $ bin/flink run -s :savepointPath [:runArgs]
-```
+{% endhighlight %}
 
 Again, for more details, please take a look at the [savepoint documentation]({{ site.baseurl }}/ops/state/savepoints.html).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/quickstart/run_example_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/quickstart/run_example_quickstart.md b/docs/quickstart/run_example_quickstart.md
index 836bfab..1a79338 100644
--- a/docs/quickstart/run_example_quickstart.md
+++ b/docs/quickstart/run_example_quickstart.md
@@ -366,7 +366,7 @@ $ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar
 
 The output of that command should look similar to this, if everything went according to plan:
 
-```
+{% highlight plain %}
 03/08/2016 15:09:27 Job execution switched to status RUNNING.
 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
@@ -374,7 +374,7 @@ The output of that command should look similar to this, if everything went accor
 03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
 03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
-```
+{% endhighlight %}
 
 You can see how the individual operators start running. There are only two, because
 the operations after the window get folded into one operation for performance reasons. In Flink

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/quickstart/scala_api_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/quickstart/scala_api_quickstart.md b/docs/quickstart/scala_api_quickstart.md
index a7b73e3..e3143ff 100644
--- a/docs/quickstart/scala_api_quickstart.md
+++ b/docs/quickstart/scala_api_quickstart.md
@@ -76,9 +76,9 @@ In order to run your project you have to issue the `sbt run` command.
 Per default, this will run your job in the same JVM as `sbt` is running.
 In order to run your job in a distinct JVM, add the following line to `build.sbt`
 
-~~~scala
+{% highlight scala %}
 fork in run := true
-~~~
+{% endhighlight %}
 
 
 #### IntelliJ
@@ -98,15 +98,15 @@ In order to import the newly created project into [Eclipse](https://eclipse.org/
 These project files can be created via the [sbteclipse](https://github.com/typesafehub/sbteclipse) plugin.
 Add the following line to your `PROJECT_DIR/project/plugins.sbt` file:
 
-~~~bash
+{% highlight bash %}
 addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")
-~~~
+{% endhighlight %}
 
 In `sbt` use the following command to create the Eclipse project files
 
-~~~bash
+{% highlight bash %}
 > eclipse
-~~~
+{% endhighlight %}
 
 Now you can import the project into Eclipse via `File -> Import... -> Existing Projects into Workspace` and then select the project directory.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/quickstart/setup_quickstart.md
----------------------------------------------------------------------
diff --git a/docs/quickstart/setup_quickstart.md b/docs/quickstart/setup_quickstart.md
index b350d02..fef4aef 100644
--- a/docs/quickstart/setup_quickstart.md
+++ b/docs/quickstart/setup_quickstart.md
@@ -34,17 +34,17 @@ Flink runs on __Linux, Mac OS X, and Windows__. To be able to run Flink, the onl
 
 You can check the correct installation of Java by issuing the following command:
 
-~~~bash
+{% highlight bash %}
 java -version
-~~~
+{% endhighlight %}
 
 If you have Java 8, the output will look something like this:
 
-~~~bash
+{% highlight bash %}
 java version "1.8.0_111"
 Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
 Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
-~~~
+{% endhighlight %}
 
 {% if site.is_stable %}
 <div class="codetabs" markdown="1">
@@ -56,22 +56,22 @@ Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
 2. Go to the download directory.
 3. Unpack the downloaded archive.
 
-~~~bash
+{% highlight bash %}
 $ cd ~/Downloads        # Go to download directory
 $ tar xzf flink-*.tgz   # Unpack the downloaded archive
 $ cd flink-{{site.version}}
-~~~
+{% endhighlight %}
 </div>
 
 <div data-lang="MacOS X" markdown="1">
 For MacOS X users, Flink can be installed through [Homebrew](https://brew.sh/).
 
-~~~bash
+{% highlight bash %}
 $ brew install apache-flink
 ...
 $ flink --version
 Version: 1.2.0, Commit ID: 1c659cf
-~~~
+{% endhighlight %}
 </div>
 
 </div>
@@ -80,19 +80,19 @@ Version: 1.2.0, Commit ID: 1c659cf
 ### Download and Compile
 Clone the source code from one of our [repositories](http://flink.apache.org/community.html#source-code), e.g.:
 
-~~~bash
+{% highlight bash %}
 $ git clone https://github.com/apache/flink.git
 $ cd flink
 $ mvn clean package -DskipTests # this will take up to 10 minutes
 $ cd build-target               # this is where Flink is installed to
-~~~
+{% endhighlight %}
 {% endif %}
 
 ### Start a Local Flink Cluster
 
-~~~bash
+{% highlight bash %}
 $ ./bin/start-cluster.sh  # Start Flink
-~~~
+{% endhighlight %}
 
 Check the __JobManager's web frontend__ at [http://localhost:8081](http://localhost:8081) and make sure everything is up and running. The web frontend should report a single available TaskManager instance.
 
@@ -100,13 +100,13 @@ Check the __JobManager's web frontend__ at [http://localhost:8081](http://localh
 
 You can also verify that the system is running by checking the log files in the `logs` directory:
 
-~~~bash
+{% highlight bash %}
 $ tail log/flink-*-jobmanager-*.log
 INFO ... - Starting JobManager
 INFO ... - Starting JobManager web frontend
 INFO ... - Web frontend listening at 127.0.0.1:8081
 INFO ... - Registered TaskManager at 127.0.0.1 (akka://flink/user/taskmanager)
-~~~
+{% endhighlight %}
 
 ## Read the Code
 
@@ -233,13 +233,13 @@ window of processing time, as long as words are floating in.
 
 * First of all, we use **netcat** to start local server via
 
-  ~~~bash
+  {% highlight bash %}
   $ nc -l 9000
-  ~~~
+  {% endhighlight %}
 
 * Submit the Flink program:
 
-  ~~~bash
+  {% highlight bash %}
   $ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
 
   Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
@@ -255,7 +255,7 @@ window of processing time, as long as words are floating in.
   11/04/2016 14:04:51     Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to DEPLOYING
   11/04/2016 14:04:51     Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to RUNNING
   11/04/2016 14:04:51     Source: Socket Stream -> Flat Map(1/1) switched to RUNNING
-  ~~~
+  {% endhighlight %}
 
   The program connects to the socket and waits for input. You can check the web interface to verify that the job is running as expected:
 
@@ -273,28 +273,28 @@ window of processing time, as long as words are floating in.
   and write some text in `nc` (input is sent to Flink line by line after
   hitting <RETURN>):
 
-  ~~~bash
+  {% highlight bash %}
   $ nc -l 9000
   lorem ipsum
   ipsum ipsum ipsum
   bye
-  ~~~
+  {% endhighlight %}
 
   The `.out` file will print the counts at the end of each time window as long
   as words are floating in, e.g.:
 
-  ~~~bash
+  {% highlight bash %}
   $ tail -f log/flink-*-taskmanager-*.out
   lorem : 1
   bye : 1
   ipsum : 4
-  ~~~~
+  {% endhighlight %}~
 
   To **stop** Flink when you're done type:
 
-  ~~~bash
+  {% highlight bash %}
   $ ./bin/stop-local.sh
-  ~~~
+  {% endhighlight %}
 
 ## Next Steps
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/start/building.md
----------------------------------------------------------------------
diff --git a/docs/start/building.md b/docs/start/building.md
index 568c4b8..a92cfe9 100644
--- a/docs/start/building.md
+++ b/docs/start/building.md
@@ -38,15 +38,15 @@ To build unit tests use Java 8u51 or above to prevent failures in unit tests tha
 
 To clone from git, enter:
 
-~~~bash
+{% highlight bash %}
 git clone {{ site.github_url }}
-~~~
+{% endhighlight %}
 
 The simplest way of building Flink is by running:
 
-~~~bash
+{% highlight bash %}
 mvn clean install -DskipTests
-~~~
+{% endhighlight %}
 
 This instructs [Maven](http://maven.apache.org) (`mvn`) to first remove all existing builds (`clean`) and then create a new Flink binary (`install`).
 
@@ -66,11 +66,11 @@ It is sufficient to call `mvn clean install -DskipTests` in the root directory o
 **Maven 3.3.x**
 The build has to be done in two steps: First in the base directory, then in the distribution project:
 
-~~~bash
+{% highlight bash %}
 mvn clean install -DskipTests
 cd flink-dist
 mvn clean install
-~~~
+{% endhighlight %}
 
 *Note:* To check your Maven version, run `mvn --version`.
 
@@ -85,17 +85,17 @@ Flink has dependencies to HDFS and YARN which are both dependencies from [Apache
 Hadoop is only supported from version 2.4.0 upwards.
 You can also specify a specific Hadoop version to build against:
 
-~~~bash
+{% highlight bash %}
 mvn clean install -DskipTests -Dhadoop.version=2.6.1
-~~~
+{% endhighlight %}
 
 ### Vendor-specific Versions
 
 To build Flink against a vendor specific Hadoop version, issue the following command:
 
-~~~bash
+{% highlight bash %}
 mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version=2.6.1-cdh5.0.0
-~~~
+{% endhighlight %}
 
 The `-Pvendor-repos` activates a Maven [build profile](http://maven.apache.org/guides/introduction/introduction-to-profiles.html) that includes the repositories of popular Hadoop vendors such as Cloudera, Hortonworks, or MapR.
 
@@ -119,12 +119,12 @@ If your home directory is encrypted you might encounter a `java.io.IOException:
 
 The workaround is to add:
 
-~~~xml
+{% highlight xml %}
 <args>
     <arg>-Xmax-classfile-name</arg>
     <arg>128</arg>
 </args>
-~~~
+{% endhighlight %}
 
 in the compiler configuration of the `pom.xml` file of the module causing the error. For example, if the error appears in the `flink-yarn` module, the above code should be added under the `<configuration>` tag of `scala-maven-plugin`. See [this issue](https://issues.apache.org/jira/browse/FLINK-2003) for more information.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/start/flink_on_windows.md
----------------------------------------------------------------------
diff --git a/docs/start/flink_on_windows.md b/docs/start/flink_on_windows.md
index e2f4228..b8dcbf7 100644
--- a/docs/start/flink_on_windows.md
+++ b/docs/start/flink_on_windows.md
@@ -30,14 +30,14 @@ To start Flink in from the *Windows Command Line*, open the command window, navi
 
 Note: The ``bin`` folder of your Java Runtime Environment must be included in Window's ``%PATH%`` variable. Follow this [guide](http://www.java.com/en/download/help/path.xml) to add Java to the ``%PATH%`` variable.
 
-~~~bash
+{% highlight bash %}
 $ cd flink
 $ cd bin
 $ start-cluster.bat
 Starting a local cluster with one JobManager process and one TaskManager process.
 You can terminate the processes via CTRL-C in the spawned shell windows.
 Web interface by default on http://localhost:8081/.
-~~~
+{% endhighlight %}
 
 After that, you need to open a second terminal to run jobs using `flink.bat`.
 
@@ -47,11 +47,11 @@ After that, you need to open a second terminal to run jobs using `flink.bat`.
 
 With *Cygwin* you need to start the Cygwin Terminal, navigate to your Flink directory and run the `start-cluster.sh` script:
 
-~~~bash
+{% highlight bash %}
 $ cd flink
 $ bin/start-cluster.sh
 Starting cluster.
-~~~
+{% endhighlight %}
 
 {% top %}
 
@@ -59,9 +59,9 @@ Starting cluster.
 
 If you are installing Flink from the git repository and you are using the Windows git shell, Cygwin can produce a failure similar to this one:
 
-~~~bash
+{% highlight bash %}
 c:/flink/bin/start-cluster.sh: line 30: $'\r': command not found
-~~~
+{% endhighlight %}
 
 This error occurs because git is automatically transforming UNIX line endings to Windows style line endings when running in Windows. The problem is that Cygwin can only deal with UNIX style line endings. The solution is to adjust the Cygwin settings to deal with the correct line endings by following these three steps:
 
@@ -69,18 +69,18 @@ This error occurs because git is automatically transforming UNIX line endings to
 
 2. Determine your home directory by entering
 
-    ~~~bash
+    {% highlight bash %}
     cd; pwd
-    ~~~
+    {% endhighlight %}
 
     This will return a path under the Cygwin root path.
 
 3. Using NotePad, WordPad or a different text editor open the file `.bash_profile` in the home directory and append the following: (If the file does not exist you will have to create it)
 
-~~~bash
+{% highlight bash %}
 export SHELLOPTS
 set -o igncr
-~~~
+{% endhighlight %}
 
 Save the file and open a new bash shell.
 


[2/2] flink git commit: [FLINK-8726][docs] Fix and normalize code-highlighting

Posted by ch...@apache.org.
[FLINK-8726][docs] Fix and normalize code-highlighting

This closes #5909.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b13c70b6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b13c70b6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b13c70b6

Branch: refs/heads/master
Commit: b13c70b60b04e0d51604d0e37868612f77e86299
Parents: 545d530
Author: zentol <ch...@apache.org>
Authored: Wed Apr 25 12:29:08 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 3 23:05:16 2018 +0200

----------------------------------------------------------------------
 docs/dev/batch/connectors.md              |  32 +-
 docs/dev/batch/dataset_transformations.md | 516 ++++++++++++-------------
 docs/dev/batch/examples.md                |  48 +--
 docs/dev/batch/fault_tolerance.md         |   8 +-
 docs/dev/batch/hadoop_compatibility.md    |  28 +-
 docs/dev/batch/iterations.md              |  12 +-
 docs/dev/best_practices.md                |   8 +-
 docs/dev/cluster_execution.md             |   8 +-
 docs/dev/connectors/filesystem_sink.md    |   4 +-
 docs/dev/datastream_api.md                |   4 +-
 docs/dev/java8.md                         |  36 +-
 docs/dev/libs/cep.md                      |  12 +-
 docs/dev/libs/gelly/index.md              |  20 +-
 docs/dev/libs/ml/optimization.md          |   4 +-
 docs/dev/libs/storm_compatibility.md      |  28 +-
 docs/dev/local_execution.md               |  16 +-
 docs/dev/migration.md                     |   4 +-
 docs/dev/restart_strategies.md            |  20 +-
 docs/dev/scala_shell.md                   |  44 +--
 docs/dev/table/sql.md                     |   4 +-
 docs/dev/table/streaming.md               |   8 +-
 docs/monitoring/application_profiling.md  |   8 +-
 docs/monitoring/checkpoint_monitoring.md  |   4 +-
 docs/monitoring/historyserver.md          |  12 +-
 docs/monitoring/logging.md                |  20 +-
 docs/monitoring/metrics.md                |  20 +-
 docs/monitoring/rest_api.md               |  92 ++---
 docs/ops/cli.md                           |   4 +-
 docs/ops/config.md                        |   4 +-
 docs/ops/deployment/aws.md                |  68 ++--
 docs/ops/deployment/cluster_setup.md      |  16 +-
 docs/ops/deployment/hadoop.md             |   4 +-
 docs/ops/deployment/mapr_setup.md         |  28 +-
 docs/ops/deployment/yarn_setup.md         |  60 +--
 docs/ops/filesystems.md                   |   8 +-
 docs/ops/security-ssl.md                  |  32 +-
 docs/ops/state/checkpoints.md             |  16 +-
 docs/ops/state/savepoints.md              |  40 +-
 docs/ops/state/state_backends.md          |   4 +-
 docs/ops/upgrading.md                     |  24 +-
 docs/quickstart/run_example_quickstart.md |   4 +-
 docs/quickstart/scala_api_quickstart.md   |  12 +-
 docs/quickstart/setup_quickstart.md       |  48 +--
 docs/start/building.md                    |  24 +-
 docs/start/flink_on_windows.md            |  20 +-
 45 files changed, 718 insertions(+), 718 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/batch/connectors.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/connectors.md b/docs/dev/batch/connectors.md
index 2391744..7677137 100644
--- a/docs/dev/batch/connectors.md
+++ b/docs/dev/batch/connectors.md
@@ -64,12 +64,12 @@ See [Deployment & Operations - Deployment - AWS - S3: Simple Storage Service]({{
 
 For Alluxio support add the following entry into the `core-site.xml` file:
 
-~~~xml
+{% highlight xml %}
 <property>
   <name>fs.alluxio.impl</name>
   <value>alluxio.hadoop.FileSystem</value>
 </property>
-~~~
+{% endhighlight %}
 
 
 ## Connecting to other systems using Input/OutputFormat wrappers for Hadoop
@@ -89,28 +89,28 @@ This section shows some examples for connecting Flink to other systems.
 Flink has extensive build-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read from Avro files with Flink.
 Also, the serialization framework of Flink is able to handle classes generated from Avro schemas. Be sure to include the Flink Avro dependency to the pom.xml of your project.
 
-~~~xml
+{% highlight xml %}
 <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-avro</artifactId>
   <version>{{site.version }}</version>
 </dependency>
-~~~
+{% endhighlight %}
 
 In order to read data from an Avro file, you have to specify an `AvroInputFormat`.
 
 **Example**:
 
-~~~java
+{% highlight java %}
 AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
 DataSet<User> usersDS = env.createInput(users);
-~~~
+{% endhighlight %}
 
 Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:
 
-~~~java
+{% highlight java %}
 usersDS.groupBy("name")
-~~~
+{% endhighlight %}
 
 
 Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.
@@ -129,21 +129,21 @@ This example is using the `HadoopInputFormat` wrapper to use an existing Hadoop
 1. Download and compile the `azure-tables-hadoop` project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves.
 Execute the following commands:
 
-   ~~~bash
+   {% highlight bash %}
    git clone https://github.com/mooso/azure-tables-hadoop.git
    cd azure-tables-hadoop
    mvn clean install
-   ~~~
+   {% endhighlight %}
 
 2. Setup a new Flink project using the quickstarts:
 
-   ~~~bash
+   {% highlight bash %}
    curl https://flink.apache.org/q/quickstart.sh | bash
-   ~~~
+   {% endhighlight %}
 
 3. Add the following dependencies (in the `<dependencies>` section) to your `pom.xml` file:
 
-   ~~~xml
+   {% highlight xml %}
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hadoop-compatibility{{ site.scala_version_suffix }}</artifactId>
@@ -154,7 +154,7 @@ Execute the following commands:
      <artifactId>microsoft-hadoop-azure</artifactId>
      <version>0.0.4</version>
    </dependency>
-   ~~~
+   {% endhighlight %}
 
    `flink-hadoop-compatibility` is a Flink package that provides the Hadoop input format wrappers.
    `microsoft-hadoop-azure` is adding the project we've build before to our project.
@@ -164,7 +164,7 @@ Browse to the code of the `Job.java` file. Its an empty skeleton for a Flink job
 
 Paste the following code into it:
 
-~~~java
+{% highlight java %}
 import java.util.Map;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
@@ -217,7 +217,7 @@ public class AzureTableExample {
     env.execute("Azure Example");
   }
 }
-~~~
+{% endhighlight %}
 
 The example shows how to access an Azure table and turn data into Flink's `DataSet` (more specifically, the type of the set is `DataSet<Tuple2<Text, WritableEntity>>`). With the `DataSet`, you can apply all known transformations to the DataSet.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/batch/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/dataset_transformations.md b/docs/dev/batch/dataset_transformations.md
index d63ee88..673de60 100644
--- a/docs/dev/batch/dataset_transformations.md
+++ b/docs/dev/batch/dataset_transformations.md
@@ -42,7 +42,7 @@ The following code transforms a DataSet of Integer pairs into a DataSet of Integ
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 // MapFunction that adds two integer values
 public class IntAdder implements MapFunction<Tuple2<Integer, Integer>, Integer> {
   @Override
@@ -54,22 +54,22 @@ public class IntAdder implements MapFunction<Tuple2<Integer, Integer>, Integer>
 // [...]
 DataSet<Tuple2<Integer, Integer>> intPairs = // [...]
 DataSet<Integer> intSums = intPairs.map(new IntAdder());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val intPairs: DataSet[(Int, Int)] = // [...]
 val intSums = intPairs.map { pair => pair._1 + pair._2 }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  intSums = intPairs.map(lambda x: sum(x))
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -84,7 +84,7 @@ The following code transforms a DataSet of text lines into a DataSet of words:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 // FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
 public class Tokenizer implements FlatMapFunction<String, String> {
   @Override
@@ -98,22 +98,22 @@ public class Tokenizer implements FlatMapFunction<String, String> {
 // [...]
 DataSet<String> textLines = // [...]
 DataSet<String> words = textLines.flatMap(new Tokenizer());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val textLines: DataSet[String] = // [...]
 val words = textLines.flatMap { _.split(" ") }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  words = lines.flat_map(lambda x,c: [line.split() for line in x])
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -129,7 +129,7 @@ The following code transforms a DataSet of text lines into a DataSet of counts p
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 public class PartitionCounter implements MapPartitionFunction<String, Long> {
 
   public void mapPartition(Iterable<String> values, Collector<Long> out) {
@@ -144,24 +144,24 @@ public class PartitionCounter implements MapPartitionFunction<String, Long> {
 // [...]
 DataSet<String> textLines = // [...]
 DataSet<Long> counts = textLines.mapPartition(new PartitionCounter());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val textLines: DataSet[String] = // [...]
 // Some is required because the return value must be a Collection.
 // There is an implicit conversion from Option to a Collection.
 val counts = texLines.mapPartition { in => Some(in.size) }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  counts = lines.map_partition(lambda x,c: [sum(1 for _ in x)])
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -175,7 +175,7 @@ The following code removes all Integers smaller than zero from a DataSet:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 // FilterFunction that filters out all Integers smaller than zero.
 public class NaturalNumberFilter implements FilterFunction<Integer> {
   @Override
@@ -187,22 +187,22 @@ public class NaturalNumberFilter implements FilterFunction<Integer> {
 // [...]
 DataSet<Integer> intNumbers = // [...]
 DataSet<Integer> naturalNumbers = intNumbers.filter(new NaturalNumberFilter());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val intNumbers: DataSet[Int] = // [...]
 val naturalNumbers = intNumbers.filter { _ > 0 }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  naturalNumbers = intNumbers.filter(lambda x: x > 0)
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -222,40 +222,40 @@ The following code shows different ways to apply a Project transformation on a D
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple3<Integer, Double, String>> in = // [...]
 // converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
 DataSet<Tuple2<String, Integer>> out = in.project(2,0);
-~~~
+{% endhighlight %}
 
 #### Projection with Type Hint
 
 Note that the Java compiler cannot infer the return type of `project` operator. This can cause a problem if you call another operator on a result of `project` operator such as:
 
-~~~java
+{% highlight java %}
 DataSet<Tuple5<String,String,String,String,String>> ds = ....
 DataSet<Tuple1<String>> ds2 = ds.project(0).distinct(0);
-~~~
+{% endhighlight %}
 
 This problem can be overcome by hinting the return type of `project` operator like this:
 
-~~~java
+{% highlight java %}
 DataSet<Tuple1<String>> ds2 = ds.<Tuple1<String>>project(0).distinct(0);
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 out = in.project(2,0);
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -294,7 +294,7 @@ with a reduce function.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 // some ordinary POJO
 public class WC {
   public String word;
@@ -317,12 +317,12 @@ DataSet<WC> wordCounts = words
                          .groupBy("word")
                          // apply ReduceFunction on grouped DataSet
                          .reduce(new WordCounter());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 // some ordinary POJO
 class WC(val word: String, val count: Int) {
   def this() {
@@ -335,14 +335,14 @@ val words: DataSet[WC] = // [...]
 val wordCounts = words.groupBy("word").reduce {
   (w1, w2) => new WC(w1.word, w1.count + w2.count)
 }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 </div>
 </div>
 
@@ -356,7 +356,7 @@ with a reduce function.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 // some ordinary POJO
 public class WC {
   public String word;
@@ -386,12 +386,12 @@ public class SelectWord implements KeySelector<WC, String> {
     return w.word;
   }
 }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 // some ordinary POJO
 class WC(val word: String, val count: Int) {
   def this() {
@@ -404,12 +404,12 @@ val words: DataSet[WC] = // [...]
 val wordCounts = words.groupBy { _.word } reduce {
   (w1, w2) => new WC(w1.word, w1.count + w2.count)
 }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 class WordCounter(ReduceFunction):
     def reduce(self, in1, in2):
         return (in1[0], in1[1] + in2[1])
@@ -418,7 +418,7 @@ words = // [...]
 wordCounts = words \
     .group_by(lambda x: x[0]) \
     .reduce(WordCounter())
-~~~
+{% endhighlight %}
 </div>
 </div>
 
@@ -430,30 +430,30 @@ The following code shows how to use field position keys and apply a reduce funct
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple3<String, Integer, Double>> tuples = // [...]
 DataSet<Tuple3<String, Integer, Double>> reducedTuples = tuples
                                          // group DataSet on first and second field of Tuple
                                          .groupBy(0, 1)
                                          // apply ReduceFunction on grouped DataSet
                                          .reduce(new MyTupleReducer());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val tuples = DataSet[(String, Int, Double)] = // [...]
 // group on the first and second Tuple field
 val reducedTuples = tuples.groupBy(0, 1).reduce { ... }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  reducedTuples = tuples.group_by(0, 1).reduce( ... )
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -465,25 +465,25 @@ When using Case Classes you can also specify the grouping key using the names of
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 Not supported.
-~~~
+{% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 case class MyClass(val a: String, b: Int, c: Double)
 val tuples = DataSet[MyClass] = // [...]
 // group on the first and second field
 val reducedTuples = tuples.groupBy("a", "b").reduce { ... }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 </div>
 </div>
 
@@ -502,7 +502,7 @@ The following code shows how duplicate strings can be removed from a DataSet gro
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 public class DistinctReduce
          implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
 
@@ -530,23 +530,23 @@ DataSet<Tuple2<Integer, String>> input = // [...]
 DataSet<Tuple2<Integer, String>> output = input
                            .groupBy(0)            // group DataSet by the first tuple field
                            .reduceGroup(new DistinctReduce());  // apply GroupReduceFunction
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input: DataSet[(Int, String)] = // [...]
 val output = input.groupBy(0).reduceGroup {
       (in, out: Collector[(Int, String)]) =>
         in.toSet foreach (out.collect)
     }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  class DistinctReduce(GroupReduceFunction):
    def reduce(self, iterator, collector):
      dic = dict()
@@ -556,7 +556,7 @@ val output = input.groupBy(0).reduceGroup {
        collector.collect(key)
 
  output = data.group_by(0).reduce_group(DistinctReduce())
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -578,7 +578,7 @@ The following code shows another example how to remove duplicate Strings in a Da
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 // GroupReduceFunction that removes consecutive identical elements
 public class DistinctReduce
          implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
@@ -607,12 +607,12 @@ DataSet<Double> output = input
                          .groupBy(0)                         // group DataSet by first field
                          .sortGroup(1, Order.ASCENDING)      // sort groups on second tuple field
                          .reduceGroup(new DistinctReduce());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input: DataSet[(Int, String)] = // [...]
 val output = input.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup {
       (in, out: Collector[(Int, String)]) =>
@@ -624,12 +624,12 @@ val output = input.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup {
         }
     }
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  class DistinctReduce(GroupReduceFunction):
    def reduce(self, iterator, collector):
      dic = dict()
@@ -639,7 +639,7 @@ val output = input.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup {
        collector.collect(key)
 
  output = data.group_by(0).sort_group(1, Order.ASCENDING).reduce_group(DistinctReduce())
-~~~
+{% endhighlight %}
 
 
 </div>
@@ -660,7 +660,7 @@ of the `GroupReduceFunction` as shown in the following example:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 // Combinable GroupReduceFunction that computes a sum.
 public class MyCombinableGroupReducer implements
   GroupReduceFunction<Tuple2<String, Integer>, String>,
@@ -695,12 +695,12 @@ public class MyCombinableGroupReducer implements
     out.collect(new Tuple2<>(key, sum));
   }
 }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 
 // Combinable GroupReduceFunction that computes two sums.
 class MyCombinableGroupReducer
@@ -727,12 +727,12 @@ class MyCombinableGroupReducer
     out.collect(r)
   }
 }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  class GroupReduce(GroupReduceFunction):
    def reduce(self, iterator, collector):
      key, int_sum = iterator.next()
@@ -747,7 +747,7 @@ class MyCombinableGroupReducer
      collector.collect((key, int_sum))
 
 data.reduce_group(GroupReduce(), combinable=True)
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -778,7 +778,7 @@ an alternative WordCount implementation.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<String> input = [..] // The words received as input
 
 DataSet<Tuple2<String, Integer>> combinedWords = input
@@ -814,12 +814,12 @@ DataSet<Tuple2<String, Integer>> output = combinedWords
         out.collect(new Tuple2(key, count));
     }
 });
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input: DataSet[String] = [..] // The words received as input
 
 val combinedWords: DataSet[(String, Int)] = input
@@ -850,14 +850,14 @@ val output: DataSet[(String, Int)] = combinedWords
         out.collect((key, sum))
 }
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -883,31 +883,31 @@ The following code shows how to apply an Aggregation transformation on a DataSet
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple3<Integer, String, Double>> input = // [...]
 DataSet<Tuple3<Integer, String, Double>> output = input
                                    .groupBy(1)        // group DataSet on second field
                                    .aggregate(SUM, 0) // compute sum of the first field
                                    .and(MIN, 2);      // compute minimum of the third field
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input: DataSet[(Int, String, Double)] = // [...]
 val output = input.groupBy(1).aggregate(SUM, 0).and(MIN, 2)
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 from flink.functions.Aggregation import Sum, Min
 
 input = # [...]
 output = input.group_by(1).aggregate(Sum, 0).and_agg(Min, 2)
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -926,29 +926,29 @@ The following code shows how to select the tuple with the minimum values for the
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple3<Integer, String, Double>> input = // [...]
 DataSet<Tuple3<Integer, String, Double>> output = input
                                    .groupBy(1)   // group DataSet on second field
                                    .minBy(0, 2); // select tuple with minimum values for first and third field.
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input: DataSet[(Int, String, Double)] = // [...]
 val output: DataSet[(Int, String, Double)] = input
                                    .groupBy(1)  // group DataSet on second field
                                    .minBy(0, 2) // select tuple with minimum values for first and third field.
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -963,7 +963,7 @@ The following code shows how to sum all elements of an Integer DataSet:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 // ReduceFunction that sums Integers
 public class IntSummer implements ReduceFunction<Integer> {
   @Override
@@ -975,23 +975,23 @@ public class IntSummer implements ReduceFunction<Integer> {
 // [...]
 DataSet<Integer> intNumbers = // [...]
 DataSet<Integer> sum = intNumbers.reduce(new IntSummer());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val intNumbers = env.fromElements(1,2,3)
 val sum = intNumbers.reduce (_ + _)
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  intNumbers = env.from_elements(1,2,3)
  sum = intNumbers.reduce(lambda x,y: x + y)
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1008,26 +1008,26 @@ The following example shows how to apply a GroupReduce transformation on a full
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Integer> input = // [...]
 // apply a (preferably combinable) GroupReduceFunction to a DataSet
 DataSet<Double> output = input.reduceGroup(new MyGroupReducer());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input: DataSet[Int] = // [...]
 val output = input.reduceGroup(new MyGroupReducer())
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  output = data.reduce_group(MyGroupReducer())
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1059,31 +1059,31 @@ The following code shows how to apply an Aggregation transformation on a full Da
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple2<Integer, Double>> input = // [...]
 DataSet<Tuple2<Integer, Double>> output = input
                                      .aggregate(SUM, 0)    // compute sum of the first field
                                      .and(MIN, 1);    // compute minimum of the second field
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input: DataSet[(Int, String, Double)] = // [...]
 val output = input.aggregate(SUM, 0).and(MIN, 2)
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 from flink.functions.Aggregation import Sum, Min
 
 input = # [...]
 output = input.aggregate(Sum, 0).and_agg(Min, 2)
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1099,27 +1099,27 @@ The following code shows how to select the tuple with the maximum values for the
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple3<Integer, String, Double>> input = // [...]
 DataSet<Tuple3<Integer, String, Double>> output = input
                                    .maxBy(0, 2); // select tuple with maximum values for first and third field.
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input: DataSet[(Int, String, Double)] = // [...]
 val output: DataSet[(Int, String, Double)] = input                          
                                    .maxBy(0, 2) // select tuple with maximum values for first and third field.
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1132,27 +1132,27 @@ The following code removes all duplicate elements from the DataSet:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple2<Integer, Double>> input = // [...]
 DataSet<Tuple2<Integer, Double>> output = input.distinct();
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input: DataSet[(Int, String, Double)] = // [...]
 val output = input.distinct()
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1168,27 +1168,27 @@ It is also possible to change how the distinction of the elements in the DataSet
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple2<Integer, Double, String>> input = // [...]
 DataSet<Tuple2<Integer, Double, String>> output = input.distinct(0,2);
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input: DataSet[(Int, Double, String)] = // [...]
 val output = input.distinct(0,2)
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1198,7 +1198,7 @@ Not supported.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 private static class AbsSelector implements KeySelector<Integer, Integer> {
 private static final long serialVersionUID = 1L;
 	@Override
@@ -1209,23 +1209,23 @@ private static final long serialVersionUID = 1L;
 DataSet<Integer> input = // [...]
 DataSet<Integer> output = input.distinct(new AbsSelector());
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input: DataSet[Int] = // [...]
 val output = input.distinct {x => Math.abs(x)}
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1235,7 +1235,7 @@ Not supported.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 // some ordinary POJO
 public class CustomType {
   public String aName;
@@ -1246,26 +1246,26 @@ public class CustomType {
 DataSet<CustomType> input = // [...]
 DataSet<CustomType> output = input.distinct("aName", "aNumber");
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 // some ordinary POJO
 case class CustomType(aName : String, aNumber : Int) { }
 
 val input: DataSet[CustomType] = // [...]
 val output = input.distinct("aName", "aNumber")
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1275,28 +1275,28 @@ It is also possible to indicate to use all the fields by the wildcard character:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<CustomType> input = // [...]
 DataSet<CustomType> output = input.distinct("*");
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 // some ordinary POJO
 val input: DataSet[CustomType] = // [...]
 val output = input.distinct("_")
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1321,7 +1321,7 @@ The following code shows a default Join transformation using field position keys
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 public static class User { public String name; public int zip; }
 public static class Store { public Manager mgr; public int zip; }
 DataSet<User> input1 = // [...]
@@ -1331,23 +1331,23 @@ DataSet<Tuple2<User, Store>>
             result = input1.join(input2)
                            .where("zip")       // key of the first input (users)
                            .equalTo("zip");    // key of the second input (stores)
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input1: DataSet[(Int, String)] = // [...]
 val input2: DataSet[(Double, Int)] = // [...]
 val result = input1.join(input2).where(0).equalTo(1)
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  result = input1.join(input2).where(0).equal_to(1)
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1362,7 +1362,7 @@ The following code performs a join of DataSet with custom java objects and a Tup
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 // some POJO
 public class Rating {
   public String name;
@@ -1395,12 +1395,12 @@ DataSet<Tuple2<String, Double>>
 
                    // applying the JoinFunction on joining pairs
                    .with(new PointWeighter());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 case class Rating(name: String, category: String, points: Int)
 
 val ratings: DataSet[Ratings] = // [...]
@@ -1409,12 +1409,12 @@ val weights: DataSet[(String, Double)] = // [...]
 val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
   (rating, weight) => (rating.name, rating.points * weight._2)
 }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  class PointWeighter(JoinFunction):
    def join(self, rating, weight):
      return (rating[0], rating[1] * weight[1])
@@ -1423,7 +1423,7 @@ val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
  weightedRatings =
    ratings.join(weights).where(0).equal_to(0). \
    with(new PointWeighter());
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1437,7 +1437,7 @@ return (collect), zero, one, or more elements.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 public class PointWeighter
          implements FlatJoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
   @Override
@@ -1452,12 +1452,12 @@ public class PointWeighter
 DataSet<Tuple2<String, Double>>
             weightedRatings =
             ratings.join(weights) // [...]
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 case class Rating(name: String, category: String, points: Int)
 
 val ratings: DataSet[Ratings] = // [...]
@@ -1468,7 +1468,7 @@ val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
     if (weight._2 > 0.1) out.collect(rating.name, rating.points * weight._2)
 }
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
@@ -1483,7 +1483,7 @@ A Join transformation can construct result tuples using a projection as shown he
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
 DataSet<Tuple2<Integer, Double>> input2 = // [...]
 DataSet<Tuple4<Integer, String, Double, Byte>
@@ -1495,7 +1495,7 @@ DataSet<Tuple4<Integer, String, Double, Byte>
                   .equalTo(0)
                   // select and reorder fields of matching tuples
                   .projectFirst(0,2).projectSecond(1).projectFirst(1);
-~~~
+{% endhighlight %}
 
 `projectFirst(int...)` and `projectSecond(int...)` select the fields of the first and second joined input that should be assembled into an output Tuple. The order of indexes defines the order of fields in the output tuple.
 The join projection works also for non-Tuple DataSets. In this case, `projectFirst()` or `projectSecond()` must be called without arguments to add a joined element to the output Tuple.
@@ -1503,17 +1503,17 @@ The join projection works also for non-Tuple DataSets. In this case, `projectFir
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  result = input1.join(input2).where(0).equal_to(0) \
   .project_first(0,2).project_second(1).project_first(1);
-~~~
+{% endhighlight %}
 
 `project_first(int...)` and `project_second(int...)` select the fields of the first and second joined input that should be assembled into an output Tuple. The order of indexes defines the order of fields in the output tuple.
 The join projection works also for non-Tuple DataSets. In this case, `project_first()` or `project_second()` must be called without arguments to add a joined element to the output Tuple.
@@ -1528,7 +1528,7 @@ In order to guide the optimizer to pick the right execution strategy, you can hi
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple2<Integer, String>> input1 = // [...]
 DataSet<Tuple2<Integer, String>> input2 = // [...]
 
@@ -1545,12 +1545,12 @@ DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
             input1.joinWithHuge(input2)
                   .where(0)
                   .equalTo(0);
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input1: DataSet[(Int, String)] = // [...]
 val input2: DataSet[(Int, String)] = // [...]
 
@@ -1560,12 +1560,12 @@ val result1 = input1.joinWithTiny(input2).where(0).equalTo(0)
 // hint that the second DataSet is very large
 val result1 = input1.joinWithHuge(input2).where(0).equalTo(0)
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 
  #hint that the second DataSet is very small
  result1 = input1.join_with_tiny(input2).where(0).equal_to(0)
@@ -1573,7 +1573,7 @@ val result1 = input1.joinWithHuge(input2).where(0).equalTo(0)
  #hint that the second DataSet is very large
  result1 = input1.join_with_huge(input2).where(0).equal_to(0)
 
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1587,33 +1587,33 @@ to manually pick a strategy, in case you want to enforce a specific way of execu
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<SomeType> input1 = // [...]
 DataSet<AnotherType> input2 = // [...]
 
 DataSet<Tuple2<SomeType, AnotherType> result =
       input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
             .where("id").equalTo("key");
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input1: DataSet[SomeType] = // [...]
 val input2: DataSet[AnotherType] = // [...]
 
 // hint that the second DataSet is very small
 val result1 = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1668,7 +1668,7 @@ The following code performs a left outer join of DataSet with custom java object
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 // some POJO
 public class Rating {
   public String name;
@@ -1702,12 +1702,12 @@ DataSet<Tuple2<String, Integer>>
 
                    // applying the JoinFunction on joining pairs
                    .with(new PointAssigner());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 case class Rating(name: String, category: String, points: Int)
 
 val movies: DataSet[(String, String)] = // [...]
@@ -1716,14 +1716,14 @@ val ratings: DataSet[Ratings] = // [...]
 val moviesWithPoints = movies.leftOuterJoin(ratings).where(0).equalTo("name") {
   (movie, rating) => (movie._1, if (rating == null) -1 else rating.points)
 }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1737,7 +1737,7 @@ return (collect), zero, one, or more elements.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 public class PointAssigner
          implements FlatJoinFunction<Tuple2<String, String>, Rating, Tuple2<String, Integer>> {
   @Override
@@ -1755,21 +1755,21 @@ public class PointAssigner
 DataSet<Tuple2<String, Integer>>
             moviesWithPoints =
             movies.leftOuterJoin(ratings) // [...]
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1783,7 +1783,7 @@ to manually pick a strategy, in case you want to enforce a specific way of execu
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<SomeType> input1 = // [...]
 DataSet<AnotherType> input2 = // [...]
 
@@ -1794,12 +1794,12 @@ DataSet<Tuple2<SomeType, AnotherType> result1 =
 DataSet<Tuple2<SomeType, AnotherType> result2 =
       input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST)
             .where("id").equalTo("key");
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input1: DataSet[SomeType] = // [...]
 val input2: DataSet[AnotherType] = // [...]
 
@@ -1808,14 +1808,14 @@ val result1 = input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE).wher
 
 val result2 = input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST).where("id").equalTo("key")
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -1878,7 +1878,7 @@ The following code shows how to apply a Cross transformation on two DataSets usi
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 public class Coord {
   public int id;
   public int x;
@@ -1904,13 +1904,13 @@ DataSet<Tuple3<Integer, Integer, Double>>
             coords1.cross(coords2)
                    // apply CrossFunction
                    .with(new EuclideanDistComputer());
-~~~
+{% endhighlight %}
 
 #### Cross with Projection
 
 A Cross transformation can also construct result tuples using a projection as shown here:
 
-~~~java
+{% highlight java %}
 DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
 DataSet<Tuple2<Integer, Double>> input2 = // [...]
 DataSet<Tuple4<Integer, Byte, Integer, Double>
@@ -1918,14 +1918,14 @@ DataSet<Tuple4<Integer, Byte, Integer, Double>
             input1.cross(input2)
                   // select and reorder fields of matching tuples
                   .projectSecond(0).projectFirst(1,0).projectSecond(1);
-~~~
+{% endhighlight %}
 
 The field selection in a Cross projection works the same way as in the projection of Join results.
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 case class Coord(id: Int, x: Int, y: Int)
 
 val coords1: DataSet[Coord] = // [...]
@@ -1936,27 +1936,27 @@ val distances = coords1.cross(coords2) {
     val dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2))
     (c1.id, c2.id, dist)
 }
-~~~
+{% endhighlight %}
 
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  class Euclid(CrossFunction):
    def cross(self, c1, c2):
      return (c1[0], c2[0], sqrt(pow(c1[1] - c2.[1], 2) + pow(c1[2] - c2[2], 2)))
 
  distances = coords1.cross(coords2).using(Euclid())
-~~~
+{% endhighlight %}
 
 #### Cross with Projection
 
 A Cross transformation can also construct result tuples using a projection as shown here:
 
-~~~python
+{% highlight python %}
 result = input1.cross(input2).projectFirst(1,0).projectSecond(0,1);
-~~~
+{% endhighlight %}
 
 The field selection in a Cross projection works the same way as in the projection of Join results.
 
@@ -1970,7 +1970,7 @@ In order to guide the optimizer to pick the right execution strategy, you can hi
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple2<Integer, String>> input1 = // [...]
 DataSet<Tuple2<Integer, String>> input2 = // [...]
 
@@ -1987,12 +1987,12 @@ DataSet<Tuple3<Integer, Integer, String>>
             input1.crossWithHuge(input2)
                   // apply a projection (or any Cross function)
                   .projectFirst(0,1).projectSecond(1);
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val input1: DataSet[(Int, String)] = // [...]
 val input2: DataSet[(Int, String)] = // [...]
 
@@ -2002,19 +2002,19 @@ val result1 = input1.crossWithTiny(input2)
 // hint that the second DataSet is very large
 val result1 = input1.crossWithHuge(input2)
 
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  #hint that the second DataSet is very small
  result1 = input1.cross_with_tiny(input2)
 
  #hint that the second DataSet is very large
  result1 = input1.cross_with_huge(input2)
 
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -2033,7 +2033,7 @@ Similar to Reduce, GroupReduce, and Join, keys can be defined using the differen
 
 The example shows how to group by Field Position Keys (Tuple DataSets only). You can do the same with Pojo-types and key expressions.
 
-~~~java
+{% highlight java %}
 // Some CoGroupFunction definition
 class MyCoGrouper
          implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Double> {
@@ -2069,12 +2069,12 @@ DataSet<Double> output = iVals.coGroup(dVals)
                          .equalTo(0)
                          // apply CoGroup function on each pair of groups
                          .with(new MyCoGrouper());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val iVals: DataSet[(String, Int)] = // [...]
 val dVals: DataSet[(String, Double)] = // [...]
 
@@ -2088,12 +2088,12 @@ val output = iVals.coGroup(dVals).where(0).equalTo(0) {
       }
     }
 }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  class CoGroup(CoGroupFunction):
    def co_group(self, ivals, dvals, collector):
      ints = dict()
@@ -2107,7 +2107,7 @@ val output = iVals.coGroup(dVals).where(0).equalTo(0) {
 
 
  output = ivals.co_group(dvals).where(0).equal_to(0).using(CoGroup())
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -2120,30 +2120,30 @@ Produces the union of two DataSets, which have to be of the same type. A union o
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple2<String, Integer>> vals1 = // [...]
 DataSet<Tuple2<String, Integer>> vals2 = // [...]
 DataSet<Tuple2<String, Integer>> vals3 = // [...]
 DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2).union(vals3);
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val vals1: DataSet[(String, Int)] = // [...]
 val vals2: DataSet[(String, Int)] = // [...]
 val vals3: DataSet[(String, Int)] = // [...]
 
 val unioned = vals1.union(vals2).union(vals3)
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
  unioned = vals1.union(vals2).union(vals3)
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -2154,28 +2154,28 @@ Evenly rebalances the parallel partitions of a DataSet to eliminate data skew.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<String> in = // [...]
 // rebalance DataSet and apply a Map transformation.
 DataSet<Tuple2<String, String>> out = in.rebalance()
                                         .map(new Mapper());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val in: DataSet[String] = // [...]
 // rebalance DataSet and apply a Map transformation.
 val out = in.rebalance().map { ... }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -2189,28 +2189,28 @@ Keys can be specified as position keys, expression keys, and key selector functi
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple2<String, Integer>> in = // [...]
 // hash-partition DataSet by String value and apply a MapPartition transformation.
 DataSet<Tuple2<String, String>> out = in.partitionByHash(0)
                                         .mapPartition(new PartitionMapper());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val in: DataSet[(String, Int)] = // [...]
 // hash-partition DataSet by String value and apply a MapPartition transformation.
 val out = in.partitionByHash(0).mapPartition { ... }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -2223,28 +2223,28 @@ Keys can be specified as position keys, expression keys, and key selector functi
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple2<String, Integer>> in = // [...]
 // range-partition DataSet by String value and apply a MapPartition transformation.
 DataSet<Tuple2<String, String>> out = in.partitionByRange(0)
                                         .mapPartition(new PartitionMapper());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val in: DataSet[(String, Int)] = // [...]
 // range-partition DataSet by String value and apply a MapPartition transformation.
 val out = in.partitionByRange(0).mapPartition { ... }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -2259,7 +2259,7 @@ Partitions can be sorted on multiple fields by chaining `sortPartition()` calls.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple2<String, Integer>> in = // [...]
 // Locally sort partitions in ascending order on the second String field and
 // in descending order on the first String field.
@@ -2267,12 +2267,12 @@ DataSet<Tuple2<String, Integer>> in = // [...]
 DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)
                                         .sortPartition(0, Order.DESCENDING)
                                         .mapPartition(new PartitionMapper());
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val in: DataSet[(String, Int)] = // [...]
 // Locally sort partitions in ascending order on the second String field and
 // in descending order on the first String field.
@@ -2280,14 +2280,14 @@ val in: DataSet[(String, Int)] = // [...]
 val out = in.sortPartition(1, Order.ASCENDING)
             .sortPartition(0, Order.DESCENDING)
             .mapPartition { ... }
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>
@@ -2299,7 +2299,7 @@ Returns the first n (arbitrary) elements of a DataSet. First-n can be applied on
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 DataSet<Tuple2<String, Integer>> in = // [...]
 // Return the first five (arbitrary) elements of the DataSet
 DataSet<Tuple2<String, Integer>> out1 = in.first(5);
@@ -2312,12 +2312,12 @@ DataSet<Tuple2<String, Integer>> out2 = in.groupBy(0)
 DataSet<Tuple2<String, Integer>> out3 = in.groupBy(0)
                                           .sortGroup(1, Order.ASCENDING)
                                           .first(3);
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val in: DataSet[(String, Int)] = // [...]
 // Return the first five (arbitrary) elements of the DataSet
 val out1 = in.first(5)
@@ -2327,14 +2327,14 @@ val out2 = in.groupBy(0).first(2)
 
 // Return the first three elements of each String group ordered by the Integer field
 val out3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="python" markdown="1">
 
-~~~python
+{% highlight python %}
 Not supported.
-~~~
+{% endhighlight %}
 
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/batch/examples.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/examples.md b/docs/dev/batch/examples.md
index a4b2826..90e372d 100644
--- a/docs/dev/batch/examples.md
+++ b/docs/dev/batch/examples.md
@@ -44,17 +44,17 @@ Each binary release of Flink contains an `examples` directory with jar files for
 
 To run the WordCount example, issue the following command:
 
-~~~bash
+{% highlight bash %}
 ./bin/flink run ./examples/batch/WordCount.jar
-~~~
+{% endhighlight %}
 
 The other examples can be started in a similar way.
 
 Note that many examples run without passing any arguments for them, by using build-in data. To run WordCount with real data, you have to pass the path to the data:
 
-~~~bash
+{% highlight bash %}
 ./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result
-~~~
+{% endhighlight %}
 
 Note that non-local file systems require a schema prefix, such as `hdfs://`.
 
@@ -65,7 +65,7 @@ WordCount is the "Hello World" of Big Data processing systems. It computes the f
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 DataSet<String> text = env.readTextFile("/path/to/file");
@@ -95,14 +95,14 @@ public static class Tokenizer implements FlatMapFunction<String, Tuple2<String,
         }
     }
 }
-~~~
+{% endhighlight %}
 
 The {% gh_link /flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java  "WordCount example" %} implements the above described algorithm with input parameters: `--input <path> --output <path>`. As test data, any text file will do.
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val env = ExecutionEnvironment.getExecutionEnvironment
 
 // get input data
@@ -114,7 +114,7 @@ val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
   .sum(1)
 
 counts.writeAsCsv(outputPath, "\n", " ")
-~~~
+{% endhighlight %}
 
 The {% gh_link /flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala  "WordCount example" %} implements the above described algorithm with input parameters: `--input <path> --output <path>`. As test data, any text file will do.
 
@@ -131,7 +131,7 @@ In this simple example, PageRank is implemented with a [bulk iteration](iteratio
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 // read the pages and initial ranks by parsing a CSV file
@@ -202,7 +202,7 @@ public static final class EpsilonFilter
         return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
     }
 }
-~~~
+{% endhighlight %}
 
 The {% gh_link /flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/PageRank.java "PageRank program" %} implements the above example.
 It requires the following parameters to run: `--pages <path> --links <path> --output <path> --numPages <n> --iterations <n>`.
@@ -210,7 +210,7 @@ It requires the following parameters to run: `--pages <path> --links <path> --ou
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 // User-defined types
 case class Link(sourceId: Long, targetId: Long)
 case class Page(pageId: Long, rank: Double)
@@ -269,7 +269,7 @@ val result = finalRanks
 
 // emit result
 result.writeAsCsv(outputPath, "\n", " ")
-~~~
+{% endhighlight %}
 
 he {% gh_link /flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala "PageRank program" %} implements the above example.
 It requires the following parameters to run: `--pages <path> --links <path> --output <path> --numPages <n> --iterations <n>`.
@@ -293,7 +293,7 @@ This implementation uses a [delta iteration](iterations.html): Vertices that hav
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 // read vertex and edge data
 DataSet<Long> vertices = getVertexDataSet(env);
 DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());
@@ -365,14 +365,14 @@ public static final class ComponentIdFilter
         }
     }
 }
-~~~
+{% endhighlight %}
 
 The {% gh_link /flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java "ConnectedComponents program" %} implements the above example. It requires the following parameters to run: `--vertices <path> --edges <path> --output <path> --iterations <n>`.
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 // set up execution environment
 val env = ExecutionEnvironment.getExecutionEnvironment
 
@@ -408,7 +408,7 @@ val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Arra
 
 verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
 
-~~~
+{% endhighlight %}
 
 The {% gh_link /flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala "ConnectedComponents program" %} implements the above example. It requires the following parameters to run: `--vertices <path> --edges <path> --output <path> --iterations <n>`.
 </div>
@@ -426,7 +426,7 @@ The Relational Query example assumes two tables, one with `orders` and the other
 
 The example implements the following SQL query.
 
-~~~sql
+{% highlight sql %}
 SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
     FROM orders, lineitem
 WHERE l_orderkey = o_orderkey
@@ -434,14 +434,14 @@ WHERE l_orderkey = o_orderkey
     AND YEAR(o_orderdate) > 1993
     AND o_orderpriority LIKE "5%"
 GROUP BY l_orderkey, o_shippriority;
-~~~
+{% endhighlight %}
 
 The Flink program, which implements the above query looks as follows.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 // get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
 DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
 // get lineitem data set: (orderkey, extendedprice)
@@ -484,7 +484,7 @@ DataSet<Tuple3<Integer, Integer, Double>> priceSums =
 
 // emit result
 priceSums.writeAsCsv(outputPath);
-~~~
+{% endhighlight %}
 
 The {% gh_link /flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java "Relational Query program" %} implements the above query. It requires the following parameters to run: `--orders <path> --lineitem <path> --output <path>`.
 
@@ -503,19 +503,19 @@ Take the following steps to generate arbitrary large input files for the provide
 1.  Download and unpack DBGEN
 2.  Make a copy of *makefile.suite* called *Makefile* and perform the following changes:
 
-~~~bash
+{% highlight bash %}
 DATABASE = DB2
 MACHINE  = LINUX
 WORKLOAD = TPCH
 CC       = gcc
-~~~
+{% endhighlight %}
 
 1.  Build DBGEN using *make*
 2.  Generate lineitem and orders relations using dbgen. A scale factor
     (-s) of 1 results in a generated data set with about 1 GB size.
 
-~~~bash
+{% highlight bash %}
 ./dbgen -T o -s 1
-~~~
+{% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/batch/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/fault_tolerance.md b/docs/dev/batch/fault_tolerance.md
index f403791..ee1e797 100644
--- a/docs/dev/batch/fault_tolerance.md
+++ b/docs/dev/batch/fault_tolerance.md
@@ -59,9 +59,9 @@ env.setNumberOfExecutionRetries(3)
 
 You can also define default values for the number of execution retries and the retry delay in the `flink-conf.yaml`:
 
-~~~
+{% highlight yaml %}
 execution-retries.default: 3
-~~~
+{% endhighlight %}
 
 
 Retry Delays
@@ -91,8 +91,8 @@ env.getConfig.setExecutionRetryDelay(5000) // 5000 milliseconds delay
 
 You can also define the default value for the retry delay in the `flink-conf.yaml`:
 
-~~~
+{% highlight yaml %}
 execution-retries.delay: 10 s
-~~~
+{% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/batch/hadoop_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/hadoop_compatibility.md b/docs/dev/batch/hadoop_compatibility.md
index 9f1478a2..4eba2a8 100644
--- a/docs/dev/batch/hadoop_compatibility.md
+++ b/docs/dev/batch/hadoop_compatibility.md
@@ -56,13 +56,13 @@ package.
 Add the following dependency to your `pom.xml` if you want to reuse Mappers
 and Reducers.
 
-~~~xml
+{% highlight xml %}
 <dependency>
 	<groupId>org.apache.flink</groupId>
 	<artifactId>flink-hadoop-compatibility{{ site.scala_version_suffix }}</artifactId>
 	<version>{{site.version}}</version>
 </dependency>
-~~~
+{% endhighlight %}
 
 ### Using Hadoop Data Types
 
@@ -88,7 +88,7 @@ The following example shows how to use Hadoop's `TextInputFormat`.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 DataSet<Tuple2<LongWritable, Text>> input =
@@ -96,12 +96,12 @@ DataSet<Tuple2<LongWritable, Text>> input =
 
 // Do something with the data.
 [...]
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val env = ExecutionEnvironment.getExecutionEnvironment
 
 val input: DataSet[(LongWritable, Text)] =
@@ -109,7 +109,7 @@ val input: DataSet[(LongWritable, Text)] =
 
 // Do something with the data.
 [...]
-~~~
+{% endhighlight %}
 
 </div>
 
@@ -128,7 +128,7 @@ The following example shows how to use Hadoop's `TextOutputFormat`.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 // Obtain the result we want to emit
 DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
 
@@ -144,12 +144,12 @@ TextOutputFormat.setOutputPath(job, new Path(outputPath));
 
 // Emit data using the Hadoop TextOutputFormat.
 hadoopResult.output(hadoopOF);
-~~~
+{% endhighlight %}
 
 </div>
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 // Obtain your result to emit.
 val hadoopResult: DataSet[(Text, IntWritable)] = [...]
 
@@ -163,7 +163,7 @@ FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
 hadoopResult.output(hadoopOF)
 
 
-~~~
+{% endhighlight %}
 
 </div>
 
@@ -185,7 +185,7 @@ and can be used as regular Flink [FlatMapFunctions](dataset_transformations.html
 
 The following example shows how to use Hadoop `Mapper` and `Reducer` functions.
 
-~~~java
+{% highlight java %}
 // Obtain data to process somehow.
 DataSet<Tuple2<Text, LongWritable>> text = [...]
 
@@ -199,7 +199,7 @@ DataSet<Tuple2<Text, LongWritable>> result = text
   .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
     new Counter(), new Counter()
   ));
-~~~
+{% endhighlight %}
 
 **Please note:** The Reducer wrapper works on groups as defined by Flink's [groupBy()](dataset_transformations.html#transformations-on-grouped-dataset) operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the `JobConf`.
 
@@ -207,7 +207,7 @@ DataSet<Tuple2<Text, LongWritable>> result = text
 
 The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.
 
-~~~java
+{% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 // Set up the Hadoop TextInputFormat.
@@ -245,6 +245,6 @@ result.output(hadoopOF);
 
 // Execute Program
 env.execute("Hadoop WordCount");
-~~~
+{% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/batch/iterations.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/iterations.md b/docs/dev/batch/iterations.md
index f5b65ac..be0ebb9 100644
--- a/docs/dev/batch/iterations.md
+++ b/docs/dev/batch/iterations.md
@@ -102,7 +102,7 @@ There are multiple options to specify **termination conditions** for an iteratio
 
 You can also think about the iterate operator in pseudo-code:
 
-~~~java
+{% highlight java %}
 IterationState state = getInitialState();
 
 while (!terminationCriterion()) {
@@ -110,7 +110,7 @@ while (!terminationCriterion()) {
 }
 
 setFinalState(state);
-~~~
+{% endhighlight %}
 
 <div class="panel panel-default">
 	<div class="panel-body">
@@ -130,14 +130,14 @@ In the following example, we **iteratively increment a set numbers**:
   3. **Next Partial Solution**: The output of the step function will be the output of the map operator, i.e. records with incremented integers.
   4. **Iteration Result**: After ten iterations, the initial numbers will have been incremented ten times, resulting in integers `11` to `15`.
 
-~~~
+{% highlight plain %}
 // 1st           2nd                       10th
 map(1) -> 2      map(2) -> 3      ...      map(10) -> 11
 map(2) -> 3      map(3) -> 4      ...      map(11) -> 12
 map(3) -> 4      map(4) -> 5      ...      map(12) -> 13
 map(4) -> 5      map(5) -> 6      ...      map(13) -> 14
 map(5) -> 6      map(6) -> 7      ...      map(14) -> 15
-~~~
+{% endhighlight %}
 
 Note that **1**, **2**, and **4** can be arbitrary data flows.
 
@@ -162,7 +162,7 @@ The default **termination condition** for delta iterations is specified by the *
 
 You can also think about the iterate operator in pseudo-code:
 
-~~~java
+{% highlight java %}
 IterationState workset = getInitialState();
 IterationState solution = getInitialSolution();
 
@@ -173,7 +173,7 @@ while (!terminationCriterion()) {
 }
 
 setFinalState(solution);
-~~~
+{% endhighlight %}
 
 <div class="panel panel-default">
 	<div class="panel-body">

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/best_practices.md
----------------------------------------------------------------------
diff --git a/docs/dev/best_practices.md b/docs/dev/best_practices.md
index 24e7091..e8dee30 100644
--- a/docs/dev/best_practices.md
+++ b/docs/dev/best_practices.md
@@ -149,20 +149,20 @@ Also, POJOs can be used to give large `Tuple`-types a name.
 Instead of using:
 
 
-~~~java
+{% highlight java %}
 Tuple11<String, String, ..., String> var = new ...;
-~~~
+{% endhighlight %}
 
 
 It is much easier to create a custom type extending from the large Tuple type.
 
-~~~java
+{% highlight java %}
 CustomType var = new ...;
 
 public static class CustomType extends Tuple11<String, String, ..., String> {
     // constructor matching super
 }
-~~~
+{% endhighlight %}
 
 ## Using Logback instead of Log4j
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/cluster_execution.md
----------------------------------------------------------------------
diff --git a/docs/dev/cluster_execution.md b/docs/dev/cluster_execution.md
index f1d84e1..7ea8c22 100644
--- a/docs/dev/cluster_execution.md
+++ b/docs/dev/cluster_execution.md
@@ -47,19 +47,19 @@ execute the program.
 If you are developing your program as a Maven project, you have to add the
 `flink-clients` module using this dependency:
 
-~~~xml
+{% highlight xml %}
 <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-clients{{ site.scala_version_suffix }}</artifactId>
   <version>{{ site.version }}</version>
 </dependency>
-~~~
+{% endhighlight %}
 
 ### Example
 
 The following illustrates the use of the `RemoteEnvironment`:
 
-~~~java
+{% highlight java %}
 public static void main(String[] args) throws Exception {
     ExecutionEnvironment env = ExecutionEnvironment
         .createRemoteEnvironment("flink-master", 6123, "/home/user/udfs.jar");
@@ -76,7 +76,7 @@ public static void main(String[] args) throws Exception {
 
     env.execute();
 }
-~~~
+{% endhighlight %}
 
 Note that the program contains custom user code and hence requires a JAR file with
 the classes of the code attached. The constructor of the remote environment

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/connectors/filesystem_sink.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md
index 4e1f68a..4a00322 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -125,9 +125,9 @@ input.addSink(sink)
 
 This will create a sink that writes to bucket files that follow this schema:
 
-```
+{% highlight plain %}
 /base/path/{date-time}/part-{parallel-task}-{count}
-```
+{% endhighlight %}
 
 Where `date-time` is the string that we get from the date/time format, `parallel-task` is the index
 of the parallel sink instance and `count` is the running number of part files that where created

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/datastream_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md
index 6f30f29..be23804 100644
--- a/docs/dev/datastream_api.md
+++ b/docs/dev/datastream_api.md
@@ -125,9 +125,9 @@ object WindowWordCount {
 
 To run the example program, start the input stream with netcat first from a terminal:
 
-~~~bash
+{% highlight bash %}
 nc -lk 9999
-~~~
+{% endhighlight %}
 
 Just type some words hitting return for a new word. These will be the input to the
 word count program. If you want to see counts greater than 1, type the same word again and again within

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/java8.md
----------------------------------------------------------------------
diff --git a/docs/dev/java8.md b/docs/dev/java8.md
index 4949833..8e7e643 100644
--- a/docs/dev/java8.md
+++ b/docs/dev/java8.md
@@ -38,19 +38,19 @@ Flink API, please refer to the [Programming Guide]({{ site.baseurl }}/dev/api_co
 The following example illustrates how to implement a simple, inline `map()` function that squares its input using a Lambda Expression.
 The types of input `i` and output parameters of the `map()` function need not to be declared as they are inferred by the Java 8 compiler.
 
-~~~java
+{% highlight java %}
 env.fromElements(1, 2, 3)
 // returns the squared i
 .map(i -> i*i)
 .print();
-~~~
+{% endhighlight %}
 
 The next two examples show different implementations of a function that uses a `Collector` for output.
 Functions, such as `flatMap()`, require an output type (in this case `String`) to be defined for the `Collector` in order to be type-safe.
 If the `Collector` type can not be inferred from the surrounding context, it needs to be declared in the Lambda Expression's parameter list manually.
 Otherwise the output will be treated as type `Object` which can lead to undesired behaviour.
 
-~~~java
+{% highlight java %}
 DataSet<Integer> input = env.fromElements(1, 2, 3);
 
 // collector type must be declared
@@ -63,9 +63,9 @@ input.flatMap((Integer number, Collector<String> out) -> {
 })
 // returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa"
 .print();
-~~~
+{% endhighlight %}
 
-~~~java
+{% highlight java %}
 DataSet<Integer> input = env.fromElements(1, 2, 3);
 
 // collector type must not be declared, it is inferred from the type of the dataset
@@ -79,11 +79,11 @@ DataSet<String> manyALetters = input.flatMap((number, out) -> {
 
 // returns (on separate lines) "a", "a", "aa", "a", "aa", "aaa"
 manyALetters.print();
-~~~
+{% endhighlight %}
 
 The following code demonstrates a word count which makes extensive use of Lambda Expressions.
 
-~~~java
+{% highlight java %}
 DataSet<String> input = env.fromElements("Please count", "the words", "but not this");
 
 // filter out strings that contain "not"
@@ -98,7 +98,7 @@ input.filter(line -> !line.contains("not"))
 .groupBy(0).sum(1)
 // print
 .print();
-~~~
+{% endhighlight %}
 
 ### Compiler Limitations
 Currently, Flink only supports jobs containing Lambda Expressions completely if they are **compiled with the Eclipse JDT compiler contained in Eclipse Luna 4.4.2 (and above)**.
@@ -118,7 +118,7 @@ If you are using a different IDE such as IntelliJ IDEA or you want to package yo
 
 Alternatively, you can manually insert the following lines to your Maven `pom.xml` file. Maven will then use the Eclipse JDT compiler for compilation.
 
-~~~xml
+{% highlight xml %}
 <!-- put these lines under "project/build/pluginManagement/plugins" of your pom.xml -->
 
 <plugin>
@@ -138,11 +138,11 @@ Alternatively, you can manually insert the following lines to your Maven `pom.xm
         </dependency>
     </dependencies>
 </plugin>
-~~~
+{% endhighlight %}
 
 If you are using Eclipse for development, the m2e plugin might complain about the inserted lines above and marks your `pom.xml` as invalid. If so, insert the following lines to your `pom.xml`.
 
-~~~xml
+{% highlight xml %}
 <!-- put these lines under "project/build/pluginManagement/plugins/plugin[groupId="org.eclipse.m2e", artifactId="lifecycle-mapping"]/configuration/lifecycleMappingMetadata/pluginExecutions" of your pom.xml -->
 
 <pluginExecution>
@@ -159,7 +159,7 @@ If you are using Eclipse for development, the m2e plugin might complain about th
         <ignore></ignore>
     </action>
 </pluginExecution>
-~~~
+{% endhighlight %}
 
 #### Run and debug Flink jobs within the Eclipse IDE
 
@@ -171,17 +171,17 @@ If you are using Maven, you also need to change the Java version in your `pom.xm
 
 The Eclipse JDT compiler needs a special compiler flag in order to store type information in `.class` files. Open the JDT configuration file at `{project directory}/.settings/org.eclipse.jdt.core.prefs` with your favorite text editor and add the following line:
 
-~~~
+{% highlight plain %}
 org.eclipse.jdt.core.compiler.codegen.lambda.genericSignature=generate
-~~~
+{% endhighlight %}
 
 If not already done, also modify the Java versions of the following properties to `1.8` (or above):
 
-~~~
+{% highlight plain %}
 org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
 org.eclipse.jdt.core.compiler.compliance=1.8
 org.eclipse.jdt.core.compiler.source=1.8
-~~~
+{% endhighlight %}
 
 After you have saved the file, perform a complete project refresh in Eclipse IDE.
 
@@ -189,10 +189,10 @@ If you are using Maven, right click your Eclipse project and select `Maven` -> `
 
 You have configured everything correctly, if the following Flink program runs without exceptions:
 
-~~~java
+{% highlight java %}
 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 env.fromElements(1, 2, 3).map((in) -> new Tuple1<String>(" " + in)).print();
 env.execute();
-~~~
+{% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index a6136d11..29a045a 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -1454,7 +1454,7 @@ parameters
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 
-~~~java
+{% highlight java %}
 PatternStream<Event> patternStream = CEP.pattern(input, pattern);
 
 OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
@@ -1474,13 +1474,13 @@ SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
 );
 
 DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);
-~~~
+{% endhighlight %}
 
 </div>
 
 <div data-lang="scala" markdown="1">
 
-~~~scala
+{% highlight scala %}
 val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
 
 val outputTag = OutputTag[String]("side-output")
@@ -1492,12 +1492,12 @@ val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outp
 }
 
 val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)
-~~~
+{% endhighlight %}
 
 The `flatSelect` API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.
 In contrast to the `select` functions, the `flatSelect` functions are called with a `Collector`. You can use the collector to emit an arbitrary number of events.
 
-~~~scala
+{% highlight scala %}
 val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
 
 val outputTag = OutputTag[String]("side-output")
@@ -1511,7 +1511,7 @@ val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(
 }
 
 val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)
-~~~
+{% endhighlight %}
 
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/libs/gelly/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/index.md b/docs/dev/libs/gelly/index.md
index 7ae7968..2a08ac2 100644
--- a/docs/dev/libs/gelly/index.md
+++ b/docs/dev/libs/gelly/index.md
@@ -76,35 +76,35 @@ in the **opt** directory (for versions older than Flink 1.2 these can be manuall
 [Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly)). To run the Gelly examples the **flink-gelly** (for
 Java) or **flink-gelly-scala** (for Scala) jar must be copied to Flink's **lib** directory.
 
-~~~bash
+{% highlight bash %}
 cp opt/flink-gelly_*.jar lib/
 cp opt/flink-gelly-scala_*.jar lib/
-~~~
+{% endhighlight %}
 
 Gelly's examples jar includes drivers for each of the library methods and is provided in the **examples** directory.
 After configuring and starting the cluster, list the available algorithm classes:
 
-~~~bash
+{% highlight bash %}
 ./bin/start-cluster.sh
 ./bin/flink run examples/gelly/flink-gelly-examples_*.jar
-~~~
+{% endhighlight %}
 
 The Gelly drivers can generate graph data or read the edge list from a CSV file (each node in a cluster must have access
 to the input file). The algorithm description, available inputs and outputs, and configuration are displayed when an
 algorithm is selected. Print usage for [JaccardIndex](./library_methods.html#jaccard-index):
 
-~~~bash
+{% highlight bash %}
 ./bin/flink run examples/gelly/flink-gelly-examples_*.jar --algorithm JaccardIndex
-~~~
+{% endhighlight %}
 
 Display [graph metrics](./library_methods.html#metric) for a million vertex graph:
 
-~~~bash
+{% highlight bash %}
 ./bin/flink run examples/gelly/flink-gelly-examples_*.jar \
     --algorithm GraphMetrics --order directed \
     --input RMatGraph --type integer --scale 20 --simplify directed \
     --output print
-~~~
+{% endhighlight %}
 
 The size of the graph is adjusted by the *\-\-scale* and *\-\-edge_factor* parameters. The
 [library generator](./graph_generators.html#rmat-graph) provides access to additional configuration to adjust the
@@ -114,7 +114,7 @@ Sample social network data is provided by the [Stanford Network Analysis Project
 The [com-lj](http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.gz) data set is a good starter size.
 Run a few algorithms and monitor the job progress in Flink's Web UI:
 
-~~~bash
+{% highlight bash %}
 wget -O - http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.gz | gunzip -c > com-lj.ungraph.txt
 
 ./bin/flink run -q examples/gelly/flink-gelly-examples_*.jar \
@@ -131,7 +131,7 @@ wget -O - http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.g
     --algorithm JaccardIndex \
     --input CSV --type integer --simplify undirected --input_filename com-lj.ungraph.txt --input_field_delimiter $'\t' \
     --output hash
-~~~
+{% endhighlight %}
 
 Please submit feature requests and report issues on the user [mailing list](https://flink.apache.org/community.html#mailing-lists)
 or [Flink Jira](https://issues.apache.org/jira/browse/FLINK). We welcome suggestions for new algorithms and features as

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/libs/ml/optimization.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/ml/optimization.md b/docs/dev/libs/ml/optimization.md
index 1e3bd2a..5ccde25 100644
--- a/docs/dev/libs/ml/optimization.md
+++ b/docs/dev/libs/ml/optimization.md
@@ -225,9 +225,9 @@ The loss function which is minimized has to implement the `LossFunction` interfa
 Either one defines ones own `LossFunction` or one uses the `GenericLossFunction` class which constructs the loss function from an outer loss function and a prediction function.
 An example can be seen here
 
-```Scala
+{% highlight scala %}
 val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction)
-```
+{% endhighlight %}
 
 The full list of supported outer loss functions can be found [here](#partial-loss-function-values).
 The full list of supported prediction functions can be found [here](#prediction-function-values).

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/libs/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/storm_compatibility.md b/docs/dev/libs/storm_compatibility.md
index 853b8e1..7d7e6c8 100644
--- a/docs/dev/libs/storm_compatibility.md
+++ b/docs/dev/libs/storm_compatibility.md
@@ -43,13 +43,13 @@ The code resides in the `org.apache.flink.storm` package.
 
 Add the following dependency to your `pom.xml` if you want to execute Storm code in Flink.
 
-~~~xml
+{% highlight xml %}
 <dependency>
 	<groupId>org.apache.flink</groupId>
 	<artifactId>flink-storm{{ site.scala_version_suffix }}</artifactId>
 	<version>{{site.version}}</version>
 </dependency>
-~~~
+{% endhighlight %}
 
 **Please note**: Do not add `storm-core` as a dependency. It is already included via `flink-storm`.
 
@@ -74,7 +74,7 @@ If a topology is executed in a remote cluster, parameters `nimbus.host` and `nim
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-~~~java
+{% highlight java %}
 TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder
 
 // actual topology assembling code and used Spouts/Bolts can be used as-is
@@ -95,7 +95,7 @@ if(runLocal) { // submit to test cluster
 	// replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
 	FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
 }
-~~~
+{% endhighlight %}
 </div>
 </div>
 
@@ -118,7 +118,7 @@ The generic type declaration `OUT` specifies the type of the source output strea
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-~~~java
+{% highlight java %}
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 // stream has `raw` type (single field output streams only)
@@ -128,7 +128,7 @@ DataStream<String> rawInput = env.addSource(
 
 // process data stream
 [...]
-~~~
+{% endhighlight %}
 </div>
 </div>
 
@@ -145,7 +145,7 @@ The generic type declarations `IN` and `OUT` specify the type of the operator's
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-~~~java
+{% highlight java %}
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 DataStream<String> text = env.readTextFile(localFilePath);
 
@@ -156,7 +156,7 @@ DataStream<Tuple2<String, Integer>> counts = text.transform(
 
 // do further processing
 [...]
-~~~
+{% endhighlight %}
 </div>
 </div>
 
@@ -192,7 +192,7 @@ Thus, Flink additionally provides `StormConfig` class that can be used like a ra
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-~~~java
+{% highlight java %}
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 StormConfig config = new StormConfig();
@@ -204,7 +204,7 @@ env.getConfig().setGlobalJobParameters(config);
 
 // assemble program with embedded Spouts and/or Bolts
 [...]
-~~~
+{% endhighlight %}
 </div>
 </div>
 
@@ -219,7 +219,7 @@ Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitS
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-~~~java
+{% highlight java %}
 [...]
 
 // get DataStream from Spout or Bolt which declares two output streams s1 and s2 with output type SomeType
@@ -233,7 +233,7 @@ DataStream<SomeType> s2 = splitStream.select("s2").map(new SplitStreamMapper<Som
 
 // do further processing on s1 and s2
 [...]
-~~~
+{% endhighlight %}
 </div>
 </div>
 
@@ -259,7 +259,7 @@ An example of a finite Spout that emits records for 10 seconds only:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-~~~java
+{% highlight java %}
 public class TimedFiniteSpout extends BaseRichSpout implements FiniteSpout {
 	[...] // implement open(), nextTuple(), ...
 
@@ -269,7 +269,7 @@ public class TimedFiniteSpout extends BaseRichSpout implements FiniteSpout {
 		return System.currentTimeMillis() - starttime > 10000l;
 	}
 }
-~~~
+{% endhighlight %}
 </div>
 </div>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b13c70b6/docs/dev/local_execution.md
----------------------------------------------------------------------
diff --git a/docs/dev/local_execution.md b/docs/dev/local_execution.md
index 326d515..d03029b 100644
--- a/docs/dev/local_execution.md
+++ b/docs/dev/local_execution.md
@@ -43,13 +43,13 @@ Please also refer to the [debugging section]({{ site.baseurl }}/dev/batch/index.
 
 If you are developing your program in a Maven project, you have to add the `flink-clients` module using this dependency:
 
-~~~xml
+{% highlight xml %}
 <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-clients{{ site.scala_version_suffix }}</artifactId>
   <version>{{site.version}}</version>
 </dependency>
-~~~
+{% endhighlight %}
 
 ## Local Environment
 
@@ -59,7 +59,7 @@ The local environment is instantiated via the method `ExecutionEnvironment.creat
 
 In most cases, calling `ExecutionEnvironment.getExecutionEnvironment()` is the even better way to go. That method returns a `LocalEnvironment` when the program is started locally (outside the command line interface), and it returns a pre-configured environment for cluster execution, when the program is invoked by the [command line interface]({{ site.baseurl }}/ops/cli.html).
 
-~~~java
+{% highlight java %}
 public static void main(String[] args) throws Exception {
     ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 
@@ -75,17 +75,17 @@ public static void main(String[] args) throws Exception {
 
     JobExecutionResult res = env.execute();
 }
-~~~
+{% endhighlight %}
 
 The `JobExecutionResult` object, which is returned after the execution finished, contains the program runtime and the accumulator results.
 
 The `LocalEnvironment` allows also to pass custom configuration values to Flink.
 
-~~~java
+{% highlight java %}
 Configuration conf = new Configuration();
 conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
 final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
-~~~
+{% endhighlight %}
 
 *Note:* The local execution environments do not start any web frontend to monitor the execution.
 
@@ -97,7 +97,7 @@ Users can use algorithms implemented for batch processing also for cases that ar
 
 **Skeleton for Collection-based execution**
 
-~~~java
+{% highlight java %}
 public static void main(String[] args) throws Exception {
     // initialize a new Collection-based execution environment
     final ExecutionEnvironment env = new CollectionEnvironment();
@@ -118,7 +118,7 @@ public static void main(String[] args) throws Exception {
         System.err.println("Result = "+t);
     }
 }
-~~~
+{% endhighlight %}
 
 The `flink-examples-batch` module contains a full example, called `CollectionExecutionExample`.