You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/06/30 12:15:46 UTC

[16/51] [partial] flink-web git commit: [hotfix] Manual build of docs

http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/master/apis/examples.html
----------------------------------------------------------------------
diff --git a/content/docs/master/apis/examples.html b/content/docs/master/apis/examples.html
new file mode 100644
index 0000000..8355341
--- /dev/null
+++ b/content/docs/master/apis/examples.html
@@ -0,0 +1,681 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<!DOCTYPE html>
+
+<html lang="en">
+  <head>
+    <meta charset="utf-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1">
+    <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
+    
+    <title>Apache Flink 0.10-SNAPSHOT Documentation: Bundled Examples</title>
+    
+    <link rel="shortcut icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon">
+    <link rel="icon" href="http://flink.apache.org/docs/master/page/favicon.ico" type="image/x-icon">
+
+    <!-- Bootstrap -->
+    <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
+    <link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/flink.css">
+    <link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/syntax.css">
+    <link rel="stylesheet" href="http://flink.apache.org/docs/master/page/css/codetabs.css">
+    
+    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
+    <!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
+    <!--[if lt IE 9]>
+      <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
+      <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
+    <![endif]-->
+  </head>
+  <body>
+    
+    
+
+
+
+
+    <!-- Top navbar. -->
+    <nav class="navbar navbar-default navbar-fixed-top">
+      <div class="container">
+        <!-- The logo. -->
+        <div class="navbar-header">
+          <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
+            <span class="icon-bar"></span>
+            <span class="icon-bar"></span>
+            <span class="icon-bar"></span>
+          </button>
+          <div class="navbar-logo">
+            <a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/master/page/img/navbar-brand-logo.jpg"></a>
+          </div>
+        </div><!-- /.navbar-header -->
+
+        <!-- The navigation links. -->
+        <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
+          <ul class="nav navbar-nav">
+            <li><a href="http://flink.apache.org/docs/master/index.html">Overview<span class="hidden-sm hidden-xs"> 0.10</span></a></li>
+
+            <!-- Setup -->
+            <li class="dropdown">
+              <a href="http://flink.apache.org/docs/master/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a>
+              <ul class="dropdown-menu" role="menu">
+                <li><a href="http://flink.apache.org/docs/master/setup/building.html">Get Flink 0.10-SNAPSHOT</a></li>
+
+                <li class="divider"></li>
+                <li role="presentation" class="dropdown-header"><strong>Deployment</strong></li>
+                <li><a href="http://flink.apache.org/docs/master/setup/local_setup.html" class="active">Local</a></li>
+                <li><a href="http://flink.apache.org/docs/master/setup/cluster_setup.html">Cluster (Standalone)</a></li>
+                <li><a href="http://flink.apache.org/docs/master/setup/yarn_setup.html">YARN</a></li>
+                <li><a href="http://flink.apache.org/docs/master/setup/gce_setup.html">GCloud</a></li>
+                <li><a href="http://flink.apache.org/docs/master/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li>
+
+                <li class="divider"></li>
+                <li><a href="http://flink.apache.org/docs/master/setup/config.html">Configuration</a></li>
+              </ul>
+            </li>
+
+            <!-- Programming Guides -->
+            <li class="dropdown">
+              <a href="http://flink.apache.org/docs/master/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a>
+              <ul class="dropdown-menu" role="menu">
+                <li><a href="http://flink.apache.org/docs/master/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li>
+                <li><a href="http://flink.apache.org/docs/master/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li>
+                <li><a href="http://flink.apache.org/docs/master/apis/python.html">Python API <span class="badge">Beta</span></a></li>
+
+                <li class="divider"></li>
+                <li><a href="scala_shell.html">Interactive Scala Shell</a></li>
+                <li><a href="http://flink.apache.org/docs/master/apis/dataset_transformations.html">Dataset Transformations</a></li>
+                <li><a href="http://flink.apache.org/docs/master/apis/best_practices.html">Best Practices</a></li>
+                <li><a href="http://flink.apache.org/docs/master/apis/example_connectors.html">Connectors</a></li>
+                <li><a href="http://flink.apache.org/docs/master/apis/examples.html">Examples</a></li>
+                <li><a href="http://flink.apache.org/docs/master/apis/local_execution.html">Local Execution</a></li>
+                <li><a href="http://flink.apache.org/docs/master/apis/cluster_execution.html">Cluster Execution</a></li>
+                <li><a href="http://flink.apache.org/docs/master/apis/cli.html">Command Line Interface</a></li>
+                <li><a href="http://flink.apache.org/docs/master/apis/web_client.html">Web Client</a></li>
+                <li><a href="http://flink.apache.org/docs/master/apis/iterations.html">Iterations</a></li>
+                <li><a href="http://flink.apache.org/docs/master/apis/java8.html">Java 8</a></li>
+                <li><a href="http://flink.apache.org/docs/master/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li>
+              </ul>
+            </li>
+
+            <!-- Libraries -->
+            <li class="dropdown">
+              <a href="http://flink.apache.org/docs/master/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a>
+                <ul class="dropdown-menu" role="menu">
+                  <li><a href="http://flink.apache.org/docs/master/libs/spargel_guide.html">Graphs: Spargel</a></li>
+                  <li><a href="http://flink.apache.org/docs/master/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li>
+                  <li><a href="http://flink.apache.org/docs/master/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li>
+                  <li><a href="http://flink.apache.org/docs/master/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li>
+              </ul>
+            </li>
+
+            <!-- Internals -->
+            <li class="dropdown">
+              <a href="http://flink.apache.org/docs/master/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a>
+              <ul class="dropdown-menu" role="menu">
+                <li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
+                <li><a href="http://flink.apache.org/docs/master/internals/how_to_contribute.html">How to Contribute</a></li>
+                <li><a href="http://flink.apache.org/docs/master/internals/coding_guidelines.html">Coding Guidelines</a></li>
+                <li><a href="http://flink.apache.org/docs/master/internals/ide_setup.html">IDE Setup</a></li>
+                <li><a href="http://flink.apache.org/docs/master/internals/logging.html">Logging</a></li>
+                <li class="divider"></li>
+                <li role="presentation" class="dropdown-header"><strong>Internals</strong></li>
+                <li><a href="http://flink.apache.org/docs/master/internals/general_arch.html">Architecture &amp; Process Model</a></li>
+                <li><a href="http://flink.apache.org/docs/master/internals/types_serialization.html">Type Extraction &amp; Serialization</a></li>
+                <li><a href="http://flink.apache.org/docs/master/internals/job_scheduling.html">Jobs &amp; Scheduling</a></li>
+                <li><a href="http://flink.apache.org/docs/master/internals/add_operator.html">How-To: Add an Operator</a></li>
+              </ul>
+            </li>
+          </ul>
+          <form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/master/search-results.html">
+            <div class="form-group">
+              <input type="text" class="form-control" name="q" placeholder="Search all pages">
+            </div>
+            <button type="submit" class="btn btn-default">Search</button>
+          </form>
+        </div><!-- /.navbar-collapse -->
+      </div><!-- /.container -->
+    </nav>
+
+
+    
+
+    <!-- Main content. -->
+    <div class="container">
+      
+      
+<div class="row">
+  <div class="col-sm-10 col-sm-offset-1">
+    <h1>Bundled Examples</h1>
+
+
+
+<p>The following example programs showcase different applications of Flink 
+from simple word counting to graph algorithms. The code samples illustrate the 
+use of <a href="programming_guide.html">Flink’s API</a>.</p>
+
+<p>The full source code of the following and more examples can be found in the <strong>flink-java-examples</strong>
+or <strong>flink-scala-examples</strong> module of the Flink source repository.</p>
+
+<ul id="markdown-toc">
+  <li><a href="#running-an-example" id="markdown-toc-running-an-example">Running an example</a></li>
+  <li><a href="#word-count" id="markdown-toc-word-count">Word Count</a></li>
+  <li><a href="#page-rank" id="markdown-toc-page-rank">Page Rank</a></li>
+  <li><a href="#connected-components" id="markdown-toc-connected-components">Connected Components</a></li>
+  <li><a href="#relational-query" id="markdown-toc-relational-query">Relational Query</a></li>
+</ul>
+
+<h2 id="running-an-example">Running an example</h2>
+
+<p>In order to run a Flink example, we assume you have a running Flink instance available. The “Setup” tab in the navigation describes various ways of starting Flink.</p>
+
+<p>The easiest way is running the <code>./bin/start-local.sh</code> script, which will start a JobManager locally.</p>
+
+<p>Each binary release of Flink contains an <code>examples</code> directory with jar files for each of the examples on this page.</p>
+
+<p>To run the WordCount example, issue the following command:</p>
+
+<div class="highlight"><pre><code class="language-bash">./bin/flink run ./examples/flink-java-examples-0.10-SNAPSHOT-WordCount.jar</code></pre></div>
+
+<p>The other examples can be started in a similar way.</p>
+
+<p>Note that many examples run without passing any arguments for them, by using build-in data. To run WordCount with real data, you have to pass the path to the data:</p>
+
+<div class="highlight"><pre><code class="language-bash">./bin/flink run ./examples/flink-java-examples-0.10-SNAPSHOT-WordCount.jar /path/to/some/text/data /path/to/result</code></pre></div>
+
+<p>Note that non-local file systems require a schema prefix, such as <code>hdfs://</code>.</p>
+
+<h2 id="word-count">Word Count</h2>
+<p>WordCount is the “Hello World” of Big Data processing systems. It computes the frequency of words in a text collection. The algorithm works in two steps: First, the texts are splits the text to individual words. Second, the words are grouped and counted.</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
+
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">readTextFile</span><span class="o">(</span><span class="s">&quot;/path/to/file&quot;</span><span class="o">);</span> 
+
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">counts</span> <span class="o">=</span> 
+        <span class="c1">// split up the lines in pairs (2-tuples) containing: (word,1)</span>
+        <span class="n">text</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">())</span>
+        <span class="c1">// group by the tuple field &quot;0&quot; and sum up tuple field &quot;1&quot;</span>
+        <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span>
+
+<span class="n">counts</span><span class="o">.</span><span class="na">writeAsCsv</span><span class="o">(</span><span class="n">outputPath</span><span class="o">,</span> <span class="s">&quot;\n&quot;</span><span class="o">,</span> <span class="s">&quot; &quot;</span><span class="o">);</span>
+
+<span class="c1">// User-defined functions</span>
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Tokenizer</span> <span class="kd">implements</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+        <span class="c1">// normalize and split the line</span>
+        <span class="n">String</span><span class="o">[]</span> <span class="n">tokens</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">().</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">);</span>
+        
+        <span class="c1">// emit the pairs</span>
+        <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">token</span> <span class="o">:</span> <span class="n">tokens</span><span class="o">)</span> <span class="o">{</span>
+            <span class="k">if</span> <span class="o">(</span><span class="n">token</span><span class="o">.</span><span class="na">length</span><span class="o">()</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="o">)</span> <span class="o">{</span>
+                <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="n">token</span><span class="o">,</span> <span class="mi">1</span><span class="o">));</span>
+            <span class="o">}</span>   
+        <span class="o">}</span>
+    <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+    <p>The <a href="https://github.com/apache/flink/blob/master//flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java">WordCount example</a> implements the above described algorithm with input parameters: <code>&lt;text input path&gt;, &lt;output path&gt;</code>. As test data, any text file will do.</p>
+
+  </div>
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
+
+<span class="c1">// get input data</span>
+<span class="k">val</span> <span class="n">text</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">readTextFile</span><span class="o">(</span><span class="s">&quot;/path/to/file&quot;</span><span class="o">)</span>
+
+<span class="k">val</span> <span class="n">counts</span> <span class="k">=</span> <span class="n">text</span><span class="o">.</span><span class="n">flatMap</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">toLowerCase</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">)</span> <span class="n">filter</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">nonEmpty</span> <span class="o">}</span> <span class="o">}</span>
+  <span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> <span class="o">}</span>
+  <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
+  <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
+
+<span class="n">counts</span><span class="o">.</span><span class="n">writeAsCsv</span><span class="o">(</span><span class="n">outputPath</span><span class="o">,</span> <span class="s">&quot;\n&quot;</span><span class="o">,</span> <span class="s">&quot; &quot;</span><span class="o">)</span></code></pre></div>
+
+    <p>The <a href="https://github.com/apache/flink/blob/master//flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala">WordCount example</a> implements the above described algorithm with input parameters: <code>&lt;text input path&gt;, &lt;output path&gt;</code>. As test data, any text file will do.</p>
+
+  </div>
+</div>
+
+<h2 id="page-rank">Page Rank</h2>
+
+<p>The PageRank algorithm computes the “importance” of pages in a graph defined by links, which point from one pages to another page. It is an iterative graph algorithm, which means that it repeatedly applies the same computation. In each iteration, each page distributes its current rank over all its neighbors, and compute its new rank as a taxed sum of the ranks it received from its neighbors. The PageRank algorithm was popularized by the Google search engine which uses the importance of webpages to rank the results of search queries.</p>
+
+<p>In this simple example, PageRank is implemented with a <a href="iterations.html">bulk iteration</a> and a fixed number of iterations.</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
+
+<span class="c1">// read the pages and initial ranks by parsing a CSV file</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">pagesWithRanks</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">readCsvFile</span><span class="o">(</span><span class="n">pagesInputPath</span><span class="o">)</span>
+						   <span class="o">.</span><span class="na">types</span><span class="o">(</span><span class="n">Long</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Double</span><span class="o">.</span><span class="na">class</span><span class="o">)</span>
+
+<span class="c1">// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">[]&gt;&gt;</span> <span class="n">pageLinkLists</span> <span class="o">=</span> <span class="n">getLinksDataSet</span><span class="o">(</span><span class="n">env</span><span class="o">);</span>
+
+<span class="c1">// set iterative data set</span>
+<span class="n">IterativeDataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">iteration</span> <span class="o">=</span> <span class="n">pagesWithRanks</span><span class="o">.</span><span class="na">iterate</span><span class="o">(</span><span class="n">maxIterations</span><span class="o">);</span>
+
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">newRanks</span> <span class="o">=</span> <span class="n">iteration</span>
+        <span class="c1">// join pages with outgoing edges and distribute rank</span>
+        <span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">pageLinkLists</span><span class="o">).</span><span class="na">where</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">equalTo</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">JoinVertexWithEdgesMatch</span><span class="o">())</span>
+        <span class="c1">// collect and sum ranks</span>
+        <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">sum</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
+        <span class="c1">// apply dampening factor</span>
+        <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="nf">Dampener</span><span class="o">(</span><span class="n">DAMPENING_FACTOR</span><span class="o">,</span> <span class="n">numPages</span><span class="o">));</span>
+
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">finalPageRanks</span> <span class="o">=</span> <span class="n">iteration</span><span class="o">.</span><span class="na">closeWith</span><span class="o">(</span>
+        <span class="n">newRanks</span><span class="o">,</span> 
+        <span class="n">newRanks</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">iteration</span><span class="o">).</span><span class="na">where</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">equalTo</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
+        <span class="c1">// termination condition</span>
+        <span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="k">new</span> <span class="nf">EpsilonFilter</span><span class="o">()));</span>
+
+<span class="n">finalPageRanks</span><span class="o">.</span><span class="na">writeAsCsv</span><span class="o">(</span><span class="n">outputPath</span><span class="o">,</span> <span class="s">&quot;\n&quot;</span><span class="o">,</span> <span class="s">&quot; &quot;</span><span class="o">);</span>
+
+<span class="c1">// User-defined functions</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">JoinVertexWithEdgesMatch</span> 
+                    <span class="kd">implements</span> <span class="n">FlatJoinFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">[]&gt;,</span> 
+                                            <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">join</span><span class="o">(&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">page</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">[]&gt;</span> <span class="n">adj</span><span class="o">,</span> 
+                        <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+        <span class="n">Long</span><span class="o">[]</span> <span class="n">neigbors</span> <span class="o">=</span> <span class="n">adj</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
+        <span class="kt">double</span> <span class="n">rank</span> <span class="o">=</span> <span class="n">page</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
+        <span class="kt">double</span> <span class="n">rankToDistribute</span> <span class="o">=</span> <span class="n">rank</span> <span class="o">/</span> <span class="o">((</span><span class="kt">double</span><span class="o">)</span> <span class="n">neigbors</span><span class="o">.</span><span class="na">length</span><span class="o">);</span>
+            
+        <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o">&lt;</span> <span class="n">neigbors</span><span class="o">.</span><span class="na">length</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span>
+            <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;(</span><span class="n">neigbors</span><span class="o">[</span><span class="n">i</span><span class="o">],</span> <span class="n">rankToDistribute</span><span class="o">));</span>
+        <span class="o">}</span>
+    <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">Dampener</span> <span class="kd">implements</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span><span class="n">Double</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span><span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+    <span class="kd">private</span> <span class="kd">final</span> <span class="kt">double</span> <span class="n">dampening</span><span class="o">,</span> <span class="n">randomJump</span><span class="o">;</span>
+
+    <span class="kd">public</span> <span class="nf">Dampener</span><span class="o">(</span><span class="kt">double</span> <span class="n">dampening</span><span class="o">,</span> <span class="kt">double</span> <span class="n">numVertices</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">this</span><span class="o">.</span><span class="na">dampening</span> <span class="o">=</span> <span class="n">dampening</span><span class="o">;</span>
+        <span class="k">this</span><span class="o">.</span><span class="na">randomJump</span> <span class="o">=</span> <span class="o">(</span><span class="mi">1</span> <span class="o">-</span> <span class="n">dampening</span><span class="o">)</span> <span class="o">/</span> <span class="n">numVertices</span><span class="o">;</span>
+    <span class="o">}</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="nf">map</span><span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="n">value</span><span class="o">.</span><span class="na">f1</span> <span class="o">=</span> <span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">f1</span> <span class="o">*</span> <span class="n">dampening</span><span class="o">)</span> <span class="o">+</span> <span class="n">randomJump</span><span class="o">;</span>
+        <span class="k">return</span> <span class="n">value</span><span class="o">;</span>
+    <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">EpsilonFilter</span> 
+                <span class="kd">implements</span> <span class="n">FilterFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;&gt;</span> <span class="o">{</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">Math</span><span class="o">.</span><span class="na">abs</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">f0</span><span class="o">.</span><span class="na">f1</span> <span class="o">-</span> <span class="n">value</span><span class="o">.</span><span class="na">f1</span><span class="o">.</span><span class="na">f1</span><span class="o">)</span> <span class="o">&gt;</span> <span class="n">EPSILON</span><span class="o">;</span>
+    <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+    <p>The <a href="https://github.com/apache/flink/blob/master//flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java">PageRank program</a> implements the above example.
+It requires the following parameters to run: <code>&lt;pages input path&gt;, &lt;links input path&gt;, &lt;output path&gt;, &lt;num pages&gt;, &lt;num iterations&gt;</code>.</p>
+
+  </div>
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala"><span class="c1">// User-defined types</span>
+<span class="k">case</span> <span class="k">class</span> <span class="nc">Link</span><span class="o">(</span><span class="n">sourceId</span><span class="k">:</span> <span class="kt">Long</span><span class="o">,</span> <span class="n">targetId</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span>
+<span class="k">case</span> <span class="k">class</span> <span class="nc">Page</span><span class="o">(</span><span class="n">pageId</span><span class="k">:</span> <span class="kt">Long</span><span class="o">,</span> <span class="n">rank</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span>
+<span class="k">case</span> <span class="k">class</span> <span class="nc">AdjacencyList</span><span class="o">(</span><span class="n">sourceId</span><span class="k">:</span> <span class="kt">Long</span><span class="o">,</span> <span class="n">targetIds</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">Long</span><span class="o">])</span>
+
+<span class="c1">// set up execution environment</span>
+<span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
+
+<span class="c1">// read the pages and initial ranks by parsing a CSV file</span>
+<span class="k">val</span> <span class="n">pages</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">readCsvFile</span><span class="o">[</span><span class="kt">Page</span><span class="o">](</span><span class="n">pagesInputPath</span><span class="o">)</span>
+
+<span class="c1">// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))</span>
+<span class="k">val</span> <span class="n">links</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">readCsvFile</span><span class="o">[</span><span class="kt">Link</span><span class="o">](</span><span class="n">linksInputPath</span><span class="o">)</span>
+
+<span class="c1">// assign initial ranks to pages</span>
+<span class="k">val</span> <span class="n">pagesWithRanks</span> <span class="k">=</span> <span class="n">pages</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">p</span> <span class="k">=&gt;</span> <span class="nc">Page</span><span class="o">(</span><span class="n">p</span><span class="o">,</span> <span class="mf">1.0</span> <span class="o">/</span> <span class="n">numPages</span><span class="o">))</span>
+
+<span class="c1">// build adjacency list from link input</span>
+<span class="k">val</span> <span class="n">adjacencyLists</span> <span class="k">=</span> <span class="n">links</span>
+  <span class="c1">// initialize lists</span>
+  <span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">e</span> <span class="k">=&gt;</span> <span class="nc">AdjacencyList</span><span class="o">(</span><span class="n">e</span><span class="o">.</span><span class="n">sourceId</span><span class="o">,</span> <span class="nc">Array</span><span class="o">(</span><span class="n">e</span><span class="o">.</span><span class="n">targetId</span><span class="o">)))</span>
+  <span class="c1">// concatenate lists</span>
+  <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;sourceId&quot;</span><span class="o">).</span><span class="n">reduce</span> <span class="o">{</span>
+  <span class="o">(</span><span class="n">l1</span><span class="o">,</span> <span class="n">l2</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">AdjacencyList</span><span class="o">(</span><span class="n">l1</span><span class="o">.</span><span class="n">sourceId</span><span class="o">,</span> <span class="n">l1</span><span class="o">.</span><span class="n">targetIds</span> <span class="o">++</span> <span class="n">l2</span><span class="o">.</span><span class="n">targetIds</span><span class="o">)</span>
+  <span class="o">}</span>
+
+<span class="c1">// start iteration</span>
+<span class="k">val</span> <span class="n">finalRanks</span> <span class="k">=</span> <span class="n">pagesWithRanks</span><span class="o">.</span><span class="n">iterateWithTermination</span><span class="o">(</span><span class="n">maxIterations</span><span class="o">)</span> <span class="o">{</span>
+  <span class="n">currentRanks</span> <span class="k">=&gt;</span>
+    <span class="k">val</span> <span class="n">newRanks</span> <span class="k">=</span> <span class="n">currentRanks</span>
+      <span class="c1">// distribute ranks to target pages</span>
+      <span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">adjacencyLists</span><span class="o">).</span><span class="n">where</span><span class="o">(</span><span class="s">&quot;pageId&quot;</span><span class="o">).</span><span class="n">equalTo</span><span class="o">(</span><span class="s">&quot;sourceId&quot;</span><span class="o">)</span> <span class="o">{</span>
+        <span class="o">(</span><span class="n">page</span><span class="o">,</span> <span class="n">adjacent</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">Page</span><span class="o">])</span> <span class="k">=&gt;</span>
+        <span class="k">for</span> <span class="o">(</span><span class="n">targetId</span> <span class="k">&lt;-</span> <span class="n">adjacent</span><span class="o">.</span><span class="n">targetIds</span><span class="o">)</span> <span class="o">{</span>
+          <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="nc">Page</span><span class="o">(</span><span class="n">targetId</span><span class="o">,</span> <span class="n">page</span><span class="o">.</span><span class="n">rank</span> <span class="o">/</span> <span class="n">adjacent</span><span class="o">.</span><span class="n">targetIds</span><span class="o">.</span><span class="n">length</span><span class="o">))</span>
+        <span class="o">}</span>
+      <span class="o">}</span>
+      <span class="c1">// collect ranks and sum them up</span>
+      <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;pageId&quot;</span><span class="o">).</span><span class="n">aggregate</span><span class="o">(</span><span class="nc">SUM</span><span class="o">,</span> <span class="s">&quot;rank&quot;</span><span class="o">)</span>
+      <span class="c1">// apply dampening factor</span>
+      <span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="n">p</span> <span class="k">=&gt;</span>
+        <span class="nc">Page</span><span class="o">(</span><span class="n">p</span><span class="o">.</span><span class="n">pageId</span><span class="o">,</span> <span class="o">(</span><span class="n">p</span><span class="o">.</span><span class="n">rank</span> <span class="o">*</span> <span class="nc">DAMPENING_FACTOR</span><span class="o">)</span> <span class="o">+</span> <span class="o">((</span><span class="mi">1</span> <span class="o">-</span> <span class="nc">DAMPENING_FACTOR</span><span class="o">)</span> <span class="o">/</span> <span class="n">numPages</span><span class="o">))</span>
+      <span class="o">}</span>
+
+    <span class="c1">// terminate if no rank update was significant</span>
+    <span class="k">val</span> <span class="n">termination</span> <span class="k">=</span> <span class="n">currentRanks</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">newRanks</span><span class="o">).</span><span class="n">where</span><span class="o">(</span><span class="s">&quot;pageId&quot;</span><span class="o">).</span><span class="n">equalTo</span><span class="o">(</span><span class="s">&quot;pageId&quot;</span><span class="o">)</span> <span class="o">{</span>
+      <span class="o">(</span><span class="n">current</span><span class="o">,</span> <span class="n">next</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">Int</span><span class="o">])</span> <span class="k">=&gt;</span>
+        <span class="c1">// check for significant update</span>
+        <span class="k">if</span> <span class="o">(</span><span class="n">math</span><span class="o">.</span><span class="n">abs</span><span class="o">(</span><span class="n">current</span><span class="o">.</span><span class="n">rank</span> <span class="o">-</span> <span class="n">next</span><span class="o">.</span><span class="n">rank</span><span class="o">)</span> <span class="o">&gt;</span> <span class="nc">EPSILON</span><span class="o">)</span> <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
+    <span class="o">}</span>
+
+    <span class="o">(</span><span class="n">newRanks</span><span class="o">,</span> <span class="n">termination</span><span class="o">)</span>
+<span class="o">}</span>
+
+<span class="k">val</span> <span class="n">result</span> <span class="k">=</span> <span class="n">finalRanks</span>
+
+<span class="c1">// emit result</span>
+<span class="n">result</span><span class="o">.</span><span class="n">writeAsCsv</span><span class="o">(</span><span class="n">outputPath</span><span class="o">,</span> <span class="s">&quot;\n&quot;</span><span class="o">,</span> <span class="s">&quot; &quot;</span><span class="o">)</span></code></pre></div>
+
+    <p>he <a href="https://github.com/apache/flink/blob/master//flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala">PageRank program</a> implements the above example.
+It requires the following parameters to run: <code>&lt;pages input path&gt;, &lt;links input path&gt;, &lt;output path&gt;, &lt;num pages&gt;, &lt;num iterations&gt;</code>.</p>
+  </div>
+</div>
+
+<p>Input files are plain text files and must be formatted as follows:
+- Pages represented as an (long) ID separated by new-line characters.
+    * For example <code>"1\n2\n12\n42\n63\n"</code> gives five pages with IDs 1, 2, 12, 42, and 63.
+- Links are represented as pairs of page IDs which are separated by space characters. Links are separated by new-line characters:
+    * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (directed) links (1)-&gt;(2), (2)-&gt;(12), (1)-&gt;(12), and (42)-&gt;(63).</p>
+
+<p>For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).</p>
+
+<h2 id="connected-components">Connected Components</h2>
+
+<p>The Connected Components algorithm identifies parts of a larger graph which are connected by assigning all vertices in the same connected part the same component ID. Similar to PageRank, Connected Components is an iterative algorithm. In each step, each vertex propagates its current component ID to all its neighbors. A vertex accepts the component ID from a neighbor, if it is smaller than its own component ID.</p>
+
+<p>This implementation uses a <a href="iterations.html">delta iteration</a>: Vertices that have not changed their component ID do not participate in the next step. This yields much better performance, because the later iterations typically deal only with a few outlier vertices.</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="c1">// read vertex and edge data</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">vertices</span> <span class="o">=</span> <span class="n">getVertexDataSet</span><span class="o">(</span><span class="n">env</span><span class="o">);</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">edges</span> <span class="o">=</span> <span class="n">getEdgeDataSet</span><span class="o">(</span><span class="n">env</span><span class="o">).</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">UndirectEdge</span><span class="o">());</span>
+
+<span class="c1">// assign the initial component IDs (equal to the vertex ID)</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">verticesWithInitialId</span> <span class="o">=</span> <span class="n">vertices</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">DuplicateValue</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;());</span>
+        
+<span class="c1">// open a delta iteration</span>
+<span class="n">DeltaIteration</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">iteration</span> <span class="o">=</span>
+        <span class="n">verticesWithInitialId</span><span class="o">.</span><span class="na">iterateDelta</span><span class="o">(</span><span class="n">verticesWithInitialId</span><span class="o">,</span> <span class="n">maxIterations</span><span class="o">,</span> <span class="mi">0</span><span class="o">);</span>
+
+<span class="c1">// apply the step logic: </span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">changes</span> <span class="o">=</span> <span class="n">iteration</span><span class="o">.</span><span class="na">getWorkset</span><span class="o">()</span>
+        <span class="c1">// join with the edges</span>
+        <span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">edges</span><span class="o">).</span><span class="na">where</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">equalTo</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">with</span><span class="o">(</span><span class="k">new</span> <span class="nf">NeighborWithComponentIDJoin</span><span class="o">())</span>
+        <span class="c1">// select the minimum neighbor component ID</span>
+        <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">aggregate</span><span class="o">(</span><span class="n">Aggregations</span><span class="o">.</span><span class="na">MIN</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span>
+        <span class="c1">// update if the component ID of the candidate is smaller</span>
+        <span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">iteration</span><span class="o">.</span><span class="na">getSolutionSet</span><span class="o">()).</span><span class="na">where</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">equalTo</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">ComponentIdFilter</span><span class="o">());</span>
+
+<span class="c1">// close the delta iteration (delta and new workset are identical)</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">iteration</span><span class="o">.</span><span class="na">closeWith</span><span class="o">(</span><span class="n">changes</span><span class="o">,</span> <span class="n">changes</span><span class="o">);</span>
+
+<span class="c1">// emit result</span>
+<span class="n">result</span><span class="o">.</span><span class="na">writeAsCsv</span><span class="o">(</span><span class="n">outputPath</span><span class="o">,</span> <span class="s">&quot;\n&quot;</span><span class="o">,</span> <span class="s">&quot; &quot;</span><span class="o">);</span>
+
+<span class="c1">// User-defined functions</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">DuplicateValue</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="kd">implements</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">T</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">T</span><span class="o">,</span> <span class="n">T</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+    
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">T</span><span class="o">,</span> <span class="n">T</span><span class="o">&gt;</span> <span class="nf">map</span><span class="o">(</span><span class="n">T</span> <span class="n">vertex</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">T</span><span class="o">,</span> <span class="n">T</span><span class="o">&gt;(</span><span class="n">vertex</span><span class="o">,</span> <span class="n">vertex</span><span class="o">);</span>
+    <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">UndirectEdge</span> 
+                    <span class="kd">implements</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+    <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">invertedEdge</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;();</span>
+    
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">edge</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+        <span class="n">invertedEdge</span><span class="o">.</span><span class="na">f0</span> <span class="o">=</span> <span class="n">edge</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
+        <span class="n">invertedEdge</span><span class="o">.</span><span class="na">f1</span> <span class="o">=</span> <span class="n">edge</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span>
+        <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">edge</span><span class="o">);</span>
+        <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">invertedEdge</span><span class="o">);</span>
+    <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">NeighborWithComponentIDJoin</span> 
+                <span class="kd">implements</span> <span class="n">JoinFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="nf">join</span><span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">vertexWithComponent</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">edge</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;(</span><span class="n">edge</span><span class="o">.</span><span class="na">f1</span><span class="o">,</span> <span class="n">vertexWithComponent</span><span class="o">.</span><span class="na">f1</span><span class="o">);</span>
+    <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">ComponentIdFilter</span> 
+                    <span class="kd">implements</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;,</span> 
+                                            <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">value</span><span class="o">,</span> 
+                        <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">if</span> <span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">f0</span><span class="o">.</span><span class="na">f1</span> <span class="o">&lt;</span> <span class="n">value</span><span class="o">.</span><span class="na">f1</span><span class="o">.</span><span class="na">f1</span><span class="o">)</span> <span class="o">{</span>
+            <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">f0</span><span class="o">);</span>
+        <span class="o">}</span>
+    <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+    <p>The <a href="https://github.com/apache/flink/blob/master//flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java">ConnectedComponents program</a> implements the above example. It requires the following parameters to run: <code>&lt;vertex input path&gt;, &lt;edge input path&gt;, &lt;output path&gt; &lt;max num iterations&gt;</code>.</p>
+
+  </div>
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala"><span class="c1">// set up execution environment</span>
+<span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">ExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
+
+<span class="c1">// read vertex and edge data</span>
+<span class="c1">// assign the initial components (equal to the vertex id)</span>
+<span class="k">val</span> <span class="n">vertices</span> <span class="k">=</span> <span class="n">getVerticesDataSet</span><span class="o">(</span><span class="n">env</span><span class="o">).</span><span class="n">map</span> <span class="o">{</span> <span class="n">id</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">id</span><span class="o">,</span> <span class="n">id</span><span class="o">)</span> <span class="o">}</span>
+
+<span class="c1">// undirected edges by emitting for each input edge the input edges itself and an inverted</span>
+<span class="c1">// version</span>
+<span class="k">val</span> <span class="n">edges</span> <span class="k">=</span> <span class="n">getEdgesDataSet</span><span class="o">(</span><span class="n">env</span><span class="o">).</span><span class="n">flatMap</span> <span class="o">{</span> <span class="n">edge</span> <span class="k">=&gt;</span> <span class="nc">Seq</span><span class="o">(</span><span class="n">edge</span><span class="o">,</span> <span class="o">(</span><span class="n">edge</span><span class="o">.</span><span class="n">_2</span><span class="o">,</span> <span class="n">edge</span><span class="o">.</span><span class="n">_1</span><span class="o">))</span> <span class="o">}</span>
+
+<span class="c1">// open a delta iteration</span>
+<span class="k">val</span> <span class="n">verticesWithComponents</span> <span class="k">=</span> <span class="n">vertices</span><span class="o">.</span><span class="n">iterateDelta</span><span class="o">(</span><span class="n">vertices</span><span class="o">,</span> <span class="n">maxIterations</span><span class="o">,</span> <span class="nc">Array</span><span class="o">(</span><span class="mi">0</span><span class="o">))</span> <span class="o">{</span>
+  <span class="o">(</span><span class="n">s</span><span class="o">,</span> <span class="n">ws</span><span class="o">)</span> <span class="k">=&gt;</span>
+
+    <span class="c1">// apply the step logic: join with the edges</span>
+    <span class="k">val</span> <span class="n">allNeighbors</span> <span class="k">=</span> <span class="n">ws</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">edges</span><span class="o">).</span><span class="n">where</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">equalTo</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">{</span> <span class="o">(</span><span class="n">vertex</span><span class="o">,</span> <span class="n">edge</span><span class="o">)</span> <span class="k">=&gt;</span>
+      <span class="o">(</span><span class="n">edge</span><span class="o">.</span><span class="n">_2</span><span class="o">,</span> <span class="n">vertex</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span>
+    <span class="o">}</span>
+
+    <span class="c1">// select the minimum neighbor</span>
+    <span class="k">val</span> <span class="n">minNeighbors</span> <span class="k">=</span> <span class="n">allNeighbors</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">min</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
+
+    <span class="c1">// update if the component of the candidate is smaller</span>
+    <span class="k">val</span> <span class="n">updatedComponents</span> <span class="k">=</span> <span class="n">minNeighbors</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">s</span><span class="o">).</span><span class="n">where</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">equalTo</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span> <span class="o">{</span>
+      <span class="o">(</span><span class="n">newVertex</span><span class="o">,</span> <span class="n">oldVertex</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">Long</span>, <span class="kt">Long</span><span class="o">)])</span> <span class="k">=&gt;</span>
+        <span class="k">if</span> <span class="o">(</span><span class="n">newVertex</span><span class="o">.</span><span class="n">_2</span> <span class="o">&lt;</span> <span class="n">oldVertex</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span> <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">newVertex</span><span class="o">)</span>
+    <span class="o">}</span>
+
+    <span class="c1">// delta and new workset are identical</span>
+    <span class="o">(</span><span class="n">updatedComponents</span><span class="o">,</span> <span class="n">updatedComponents</span><span class="o">)</span>
+<span class="o">}</span>
+
+<span class="n">verticesWithComponents</span><span class="o">.</span><span class="n">writeAsCsv</span><span class="o">(</span><span class="n">outputPath</span><span class="o">,</span> <span class="s">&quot;\n&quot;</span><span class="o">,</span> <span class="s">&quot; &quot;</span><span class="o">)</span></code></pre></div>
+
+    <p>The <a href="https://github.com/apache/flink/blob/master//flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala">ConnectedComponents program</a> implements the above example. It requires the following parameters to run: <code>&lt;vertex input path&gt;, &lt;edge input path&gt;, &lt;output path&gt; &lt;max num iterations&gt;</code>.</p>
+  </div>
+</div>
+
+<p>Input files are plain text files and must be formatted as follows:
+- Vertices represented as IDs and separated by new-line characters.
+    * For example <code>"1\n2\n12\n42\n63\n"</code> gives five vertices with (1), (2), (12), (42), and (63).
+- Edges are represented as pairs for vertex IDs which are separated by space characters. Edges are separated by new-line characters:
+    * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (undirected) links (1)-(2), (2)-(12), (1)-(12), and (42)-(63).</p>
+
+<h2 id="relational-query">Relational Query</h2>
+
+<p>The Relational Query example assumes two tables, one with <code>orders</code> and the other with <code>lineitems</code> as specified by the <a href="http://www.tpc.org/tpch/">TPC-H decision support benchmark</a>. TPC-H is a standard benchmark in the database industry. See below for instructions how to generate the input data.</p>
+
+<p>The example implements the following SQL query.</p>
+
+<div class="highlight"><pre><code class="language-sql"><span class="k">SELECT</span> <span class="n">l_orderkey</span><span class="p">,</span> <span class="n">o_shippriority</span><span class="p">,</span> <span class="k">sum</span><span class="p">(</span><span class="n">l_extendedprice</span><span class="p">)</span> <span class="k">as</span> <span class="n">revenue</span>
+    <span class="k">FROM</span> <span class="n">orders</span><span class="p">,</span> <span class="n">lineitem</span>
+<span class="k">WHERE</span> <span class="n">l_orderkey</span> <span class="o">=</span> <span class="n">o_orderkey</span>
+    <span class="k">AND</span> <span class="n">o_orderstatus</span> <span class="o">=</span> <span class="ss">&quot;F&quot;</span> 
+    <span class="k">AND</span> <span class="k">YEAR</span><span class="p">(</span><span class="n">o_orderdate</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">1993</span>
+    <span class="k">AND</span> <span class="n">o_orderpriority</span> <span class="k">LIKE</span> <span class="ss">&quot;5%&quot;</span>
+<span class="k">GROUP</span> <span class="k">BY</span> <span class="n">l_orderkey</span><span class="p">,</span> <span class="n">o_shippriority</span><span class="p">;</span></code></pre></div>
+
+<p>The Flink program, which implements the above query looks as follows.</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="c1">// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple5</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">orders</span> <span class="o">=</span> <span class="n">getOrdersDataSet</span><span class="o">(</span><span class="n">env</span><span class="o">);</span>
+<span class="c1">// get lineitem data set: (orderkey, extendedprice)</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">lineitems</span> <span class="o">=</span> <span class="n">getLineitemDataSet</span><span class="o">(</span><span class="n">env</span><span class="o">);</span>
+
+<span class="c1">// orders filtered by year: (orderkey, custkey)</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">ordersFilteredByYear</span> <span class="o">=</span>
+        <span class="c1">// filter orders</span>
+        <span class="n">orders</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span>
+            <span class="k">new</span> <span class="n">FilterFunction</span><span class="o">&lt;</span><span class="n">Tuple5</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;()</span> <span class="o">{</span>
+                <span class="nd">@Override</span>
+                <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">Tuple5</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">t</span><span class="o">)</span> <span class="o">{</span>
+                    <span class="c1">// status filter</span>
+                    <span class="k">if</span><span class="o">(!</span><span class="n">t</span><span class="o">.</span><span class="na">f1</span><span class="o">.</span><span class="na">equals</span><span class="o">(</span><span class="n">STATUS_FILTER</span><span class="o">))</span> <span class="o">{</span>
+                        <span class="k">return</span> <span class="kc">false</span><span class="o">;</span>
+                    <span class="c1">// year filter</span>
+                    <span class="o">}</span> <span class="k">else</span> <span class="nf">if</span><span class="o">(</span><span class="n">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">t</span><span class="o">.</span><span class="na">f2</span><span class="o">.</span><span class="na">substring</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="mi">4</span><span class="o">))</span> <span class="o">&lt;=</span> <span class="n">YEAR_FILTER</span><span class="o">)</span> <span class="o">{</span>
+                        <span class="k">return</span> <span class="kc">false</span><span class="o">;</span>
+                    <span class="c1">// order priority filter</span>
+                    <span class="o">}</span> <span class="k">else</span> <span class="nf">if</span><span class="o">(!</span><span class="n">t</span><span class="o">.</span><span class="na">f3</span><span class="o">.</span><span class="na">startsWith</span><span class="o">(</span><span class="n">OPRIO_FILTER</span><span class="o">))</span> <span class="o">{</span>
+                        <span class="k">return</span> <span class="kc">false</span><span class="o">;</span>
+                    <span class="o">}</span>
+                    <span class="k">return</span> <span class="kc">true</span><span class="o">;</span>
+                <span class="o">}</span>
+            <span class="o">})</span>
+        <span class="c1">// project fields out that are no longer required</span>
+        <span class="o">.</span><span class="na">project</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span><span class="mi">4</span><span class="o">).</span><span class="na">types</span><span class="o">(</span><span class="n">Integer</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Integer</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
+
+<span class="c1">// join orders with lineitems: (orderkey, shippriority, extendedprice)</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">lineitemsOfOrders</span> <span class="o">=</span> 
+        <span class="n">ordersFilteredByYear</span><span class="o">.</span><span class="na">joinWithHuge</span><span class="o">(</span><span class="n">lineitems</span><span class="o">)</span>
+                            <span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">equalTo</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
+                            <span class="o">.</span><span class="na">projectFirst</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span><span class="mi">1</span><span class="o">).</span><span class="na">projectSecond</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
+                            <span class="o">.</span><span class="na">types</span><span class="o">(</span><span class="n">Integer</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Integer</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Double</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
+
+<span class="c1">// extendedprice sums: (orderkey, shippriority, sum(extendedprice))</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">priceSums</span> <span class="o">=</span> 
+        <span class="c1">// group by order and sum extendedprice</span>
+        <span class="n">lineitemsOfOrders</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span><span class="mi">1</span><span class="o">).</span><span class="na">aggregate</span><span class="o">(</span><span class="n">Aggregations</span><span class="o">.</span><span class="na">SUM</span><span class="o">,</span> <span class="mi">2</span><span class="o">);</span>
+
+<span class="c1">// emit result</span>
+<span class="n">priceSums</span><span class="o">.</span><span class="na">writeAsCsv</span><span class="o">(</span><span class="n">outputPath</span><span class="o">);</span></code></pre></div>
+
+    <p>The <a href="https://github.com/apache/flink/blob/master//flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java">Relational Query program</a> implements the above query. It requires the following parameters to run: <code>&lt;orders input path&gt;, &lt;lineitem input path&gt;, &lt;output path&gt;</code>.</p>
+
+  </div>
+  <div data-lang="scala">
+    <p>Coming soon…</p>
+
+    <p>The <a href="https://github.com/apache/flink/blob/master//flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala">Relational Query program</a> implements the above query. It requires the following parameters to run: <code>&lt;orders input path&gt;, &lt;lineitem input path&gt;, &lt;output path&gt;</code>.</p>
+
+  </div>
+</div>
+
+<p>The orders and lineitem files can be generated using the <a href="http://www.tpc.org/tpch/">TPC-H benchmark</a> suite’s data generator tool (DBGEN). 
+Take the following steps to generate arbitrary large input files for the provided Flink programs:</p>
+
+<ol>
+  <li>Download and unpack DBGEN</li>
+  <li>Make a copy of <em>makefile.suite</em> called <em>Makefile</em> and perform the following changes:</li>
+</ol>
+
+<div class="highlight"><pre><code class="language-bash"><span class="nv">DATABASE</span> <span class="o">=</span> DB2
+<span class="nv">MACHINE</span>  <span class="o">=</span> LINUX
+<span class="nv">WORKLOAD</span> <span class="o">=</span> TPCH
+<span class="nv">CC</span>       <span class="o">=</span> gcc</code></pre></div>
+
+<ol>
+  <li>Build DBGEN using <em>make</em></li>
+  <li>Generate lineitem and orders relations using dbgen. A scale factor
+(-s) of 1 results in a generated data set with about 1 GB size.</li>
+</ol>
+
+<div class="highlight"><pre><code class="language-bash">./dbgen -T o -s 1</code></pre></div>
+
+  </div>
+
+  <div class="col-sm-10 col-sm-offset-1">
+    <!-- Disqus thread and some vertical offset -->
+    <div style="margin-top: 75px; margin-bottom: 50px" id="disqus_thread"></div>
+  </div>
+</div>
+
+    </div><!-- /.container -->
+
+    <!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
+    <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
+    <!-- Include all compiled plugins (below), or include individual files as needed -->
+    <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
+    <script src="http://flink.apache.org/docs/master/page/js/codetabs.js"></script>
+
+    <!-- Google Analytics -->
+    <script>
+      (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
+      (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
+      m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
+      })(window,document,'script','//www.google-analytics.com/analytics.js','ga');
+
+      ga('create', 'UA-52545728-1', 'auto');
+      ga('send', 'pageview');
+    </script>
+
+    <!-- Disqus -->
+    <script type="text/javascript">
+    var disqus_shortname = 'stratosphere-eu';
+    (function() {
+        var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
+        dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
+        (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
+    })();
+</script>
+  </body>
+</html>

http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/master/apis/fig/LICENSE.txt
----------------------------------------------------------------------
diff --git a/content/docs/master/apis/fig/LICENSE.txt b/content/docs/master/apis/fig/LICENSE.txt
new file mode 100644
index 0000000..35b8673
--- /dev/null
+++ b/content/docs/master/apis/fig/LICENSE.txt
@@ -0,0 +1,17 @@
+All image files in the folder and its subfolders are
+licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/master/apis/fig/iterations_delta_iterate_operator.png
----------------------------------------------------------------------
diff --git a/content/docs/master/apis/fig/iterations_delta_iterate_operator.png b/content/docs/master/apis/fig/iterations_delta_iterate_operator.png
new file mode 100644
index 0000000..470485a
Binary files /dev/null and b/content/docs/master/apis/fig/iterations_delta_iterate_operator.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/master/apis/fig/iterations_delta_iterate_operator_example.png
----------------------------------------------------------------------
diff --git a/content/docs/master/apis/fig/iterations_delta_iterate_operator_example.png b/content/docs/master/apis/fig/iterations_delta_iterate_operator_example.png
new file mode 100644
index 0000000..15f2b54
Binary files /dev/null and b/content/docs/master/apis/fig/iterations_delta_iterate_operator_example.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/master/apis/fig/iterations_iterate_operator.png
----------------------------------------------------------------------
diff --git a/content/docs/master/apis/fig/iterations_iterate_operator.png b/content/docs/master/apis/fig/iterations_iterate_operator.png
new file mode 100644
index 0000000..aaf4158
Binary files /dev/null and b/content/docs/master/apis/fig/iterations_iterate_operator.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/master/apis/fig/iterations_iterate_operator_example.png
----------------------------------------------------------------------
diff --git a/content/docs/master/apis/fig/iterations_iterate_operator_example.png b/content/docs/master/apis/fig/iterations_iterate_operator_example.png
new file mode 100644
index 0000000..be4841c
Binary files /dev/null and b/content/docs/master/apis/fig/iterations_iterate_operator_example.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/master/apis/fig/iterations_supersteps.png
----------------------------------------------------------------------
diff --git a/content/docs/master/apis/fig/iterations_supersteps.png b/content/docs/master/apis/fig/iterations_supersteps.png
new file mode 100644
index 0000000..331dbc7
Binary files /dev/null and b/content/docs/master/apis/fig/iterations_supersteps.png differ

http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/master/apis/fig/plan_visualizer.png
----------------------------------------------------------------------
diff --git a/content/docs/master/apis/fig/plan_visualizer.png b/content/docs/master/apis/fig/plan_visualizer.png
new file mode 100644
index 0000000..85b8c55
Binary files /dev/null and b/content/docs/master/apis/fig/plan_visualizer.png differ