You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/07/04 13:00:20 UTC

svn commit: r1607832 [25/33] - in /incubator/flink: ./ _includes/ _layouts/ _plugins/ _posts/ blog/ css/ fonts/ img/ img/blog/ js/ site/ site/blog/ site/blog/page2/ site/css/ site/docs/ site/docs/0.6-SNAPSHOT/ site/docs/0.6-SNAPSHOT/css/ site/docs/0.6-...

Added: incubator/flink/site/docs/0.6-SNAPSHOT/java_api_guide.html
URL: http://svn.apache.org/viewvc/incubator/flink/site/docs/0.6-SNAPSHOT/java_api_guide.html?rev=1607832&view=auto
==============================================================================
--- incubator/flink/site/docs/0.6-SNAPSHOT/java_api_guide.html (added)
+++ incubator/flink/site/docs/0.6-SNAPSHOT/java_api_guide.html Fri Jul  4 11:00:15 2014
@@ -0,0 +1,1752 @@
+<!DOCTYPE html>
+<html lang="en">
+  <head>
+    <meta charset="utf-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1">
+    <title>Apache Flink (incubating): Java API Programming Guide</title>
+    <link rel="stylesheet" href="/css/bootstrap.css">
+    <link rel="stylesheet" href="/css/bootstrap-lumen-custom.css">
+    <link href="//maxcdn.bootstrapcdn.com/font-awesome/4.1.0/css/font-awesome.min.css" rel="stylesheet">
+  </head>
+  <body>
+
+<nav class="navbar navbar-default navbar-static-top" role="navigation">
+  <div class="container">
+    <div class="navbar-header">
+      <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
+        <span class="sr-only">Toggle navigation</span>
+        <span class="icon-bar"></span>
+        <span class="icon-bar"></span>
+        <span class="icon-bar"></span>
+      </button>
+      <a class="navbar-brand" href="/index.html">Apache Flink</a>
+    </div>
+
+    <div class="collapse navbar-collapse" id="navbar-collapse-1">
+      <ul class="nav navbar-nav">
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Quickstart <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="/docs/0.6-SNAPSHOT/setup_quickstart.html">Setup Flink</a></li>
+            <li><a href="/docs/0.6-SNAPSHOT/java_api_quickstart.html">Java API</a></li>
+            <li><a href="/docs/0.6-SNAPSHOT/scala_api_quickstart.html">Scala API</a></li>
+          </ul>
+        </li>
+
+        <li>
+          <a href="/downloads.html" class="">Downloads</a>
+        </li>
+
+        <li>
+          <a href="/docs/0.6-SNAPSHOT/faq.html" class="">FAQ</a>
+        </li>
+
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Documentation <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="/docs/0.6-SNAPSHOT/">0.6-SNAPSHOT</a></li>
+            <li><a href="http://stratosphere-javadocs.github.io/">0.6-SNAPSHOT Javadocs</a></li>
+          </ul>
+        </li>
+
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Community <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="/community.html#mailing-lists">Mailing Lists</a></li>
+            <li><a href="/community.html#issues">Issues</a></li>
+            <li><a href="/community.html#team">Team</a></li>
+            <li class="divider"></li>
+            <li><a href="/how-to-contribute.html">How To Contribute</a></li>
+          </ul>
+        </li>
+
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">ASF <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="http://www.apache.org/">Apache Software Foundation</a>
+            <li><a href="http://www.apache.org/foundation/how-it-works.html">How it works</a>
+            <li><a href="http://www.apache.org/foundation/thanks.html">Thanks</a>
+            <li><a href="http://www.apache.org/foundation/sponsorship.html">Become a Sponsor</a>
+            <li><a href="http://incubator.apache.org/projects/flink.html">Incubation Status page</a></li>
+          </ul>
+        </li>
+
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Project <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <!--<li><a href="/project.html#history">History</a></li> -->
+            <li><a href="https://wiki.apache.org/incubator/StratosphereProposal">Incubator Proposal (external)</a></li>
+            <li><a href="http://www.apache.org/licenses/LICENSE-2.0">License</a></li>
+            <li><a href="https://github.com/apache/incubator-flink">Source Code</a></li>
+          </ul>
+        </li>
+
+        <li>
+          <a href="/blog/index.html" class="">Blog</a>
+        </li>
+
+      </ul>
+    </div>
+  </div>
+</nav>
+
+    <div class="container">
+
+<div class="row">
+  <div class="col-md-3">
+    <ul>
+      <li><a href="faq.html">FAQ</a></li>
+      <li>Quickstart
+        <ul>
+          <li><a href="setup_quickstart.html">Setup</a></li>
+          <li><a href="run_example_quickstart.html">Run Example</a></li>
+          <li><a href="java_api_quickstart.html">Java API</a></li>
+          <li><a href="scala_api_quickstart.html">Scala API</a></li>
+        </ul>
+      </li>
+
+      <li>Setup &amp; Configuration
+        <ul>
+          <li><a href="local_setup.html">Local Setup</a></li>
+          <li><a href="cluster_setup.html">Cluster Setup</a></li>
+          <li><a href="yarn_setup.html">YARN Setup</a></li>
+          <li><a href="config.html">Configuration</a></li>
+        </ul>
+      </li>
+
+      <li>Programming Guides
+        <ul>
+          <li><a href="java_api_guide.html">Java API</a></li>
+          <li><a href="scala_api_guide.html">Scala API</a></li>
+          <li><a href="hadoop_compatability.html">Hadoop Compatability</a></li>
+          <li><a href="iterations.html">Iterations</a></li>
+          <li><a href="spargel_guide.html">Spargel Graph API</a></li>
+        </ul>
+      </li>
+
+      <li>Examples
+        <ul>
+          <li><a href="java_api_examples.html">Java API</a></li>
+          <li><a href="scala_api_examples.html">Scala API</a></li>
+        </ul>
+      </li>
+
+      <li>Execution
+        <ul>
+          <li><a href="local_execution.html">Local/Debugging</a></li>
+          <li><a href="cluster_execution.html">Cluster</a></li>
+          <li><a href="cli.html">Command-Line Interface</a></li>
+          <li><a href="web_client.html">Web Interface</a></li>
+        </ul>
+      </li>
+
+      <li>Internals
+        <ul>
+          <li><a href="internal_overview.html">Overview</a></li>
+        </ul>
+      </li>
+    </ul>
+  </div>
+  <div class="col-md-9">
+      <h1>Java API Programming Guide</h1>
+
+      <ul>
+<li>
+<a href="#java-api">Java API</a>
+<ul>
+<li>
+<a href="#introduction">Introduction</a>
+</li>
+<li>
+<a href="#example-program">Example Program</a>
+</li>
+<li>
+<a href="#linking-with-stratosphere">Linking with Stratosphere</a>
+</li>
+<li>
+<a href="#program-skeleton">Program Skeleton</a>
+</li>
+<li>
+<a href="#lazy-evaluation">Lazy Evaluation</a>
+</li>
+<li>
+<a href="#data-types">Data Types</a>
+<ul>
+<li>
+<ul>
+<li>
+<a href="#regular-types">Regular Types</a>
+</li>
+<li>
+<a href="#tuples">Tuples</a>
+</li>
+<li>
+<a href="#values">Values</a>
+</li>
+<li>
+<a href="#hadoop-writables">Hadoop Writables</a>
+</li>
+<li>
+<a href="#type-erasure-&-type-inferrence">Type Erasure &amp; Type Inferrence</a>
+</li>
+</ul>
+</li>
+</ul>
+</li>
+<li>
+<a href="#data-transformations">Data Transformations</a>
+<ul>
+<li>
+<a href="#map">Map</a>
+</li>
+<li>
+<a href="#flatmap">FlatMap</a>
+</li>
+<li>
+<a href="#filter">Filter</a>
+</li>
+<li>
+<a href="#project-(tuple-datasets-only)">Project (Tuple DataSets only)</a>
+</li>
+<li>
+<a href="#transformations-on-grouped-dataset">Transformations on grouped DataSet</a>
+</li>
+<li>
+<a href="#reduce-on-grouped-dataset">Reduce on grouped DataSet</a>
+<ul>
+<li>
+<a href="#reduce-on-dataset-grouped-by-keyselector-function">Reduce on DataSet grouped by KeySelector Function</a>
+</li>
+<li>
+<a href="#reduce-on-dataset-grouped-by-field-position-keys-(tuple-datasets-only)">Reduce on DataSet grouped by Field Position Keys (Tuple DataSets only)</a>
+</li>
+</ul>
+</li>
+<li>
+<a href="#groupreduce-on-grouped-dataset">GroupReduce on grouped DataSet</a>
+<ul>
+<li>
+<a href="#groupreduce-on-dataset-grouped-by-field-position-keys-(tuple-datasets-only)">GroupReduce on DataSet grouped by Field Position Keys (Tuple DataSets only)</a>
+</li>
+<li>
+<a href="#groupreduce-on-dataset-grouped-by-keyselector-function">GroupReduce on DataSet grouped by KeySelector Function</a>
+</li>
+<li>
+<a href="#groupreduce-on-sorted-groups-(tuple-datasets-only)">GroupReduce on sorted groups (Tuple DataSets only)</a>
+</li>
+<li>
+<a href="#combinable-groupreducefunctions">Combinable GroupReduceFunctions</a>
+</li>
+</ul>
+</li>
+<li>
+<a href="#aggregate-on-grouped-tuple-dataset">Aggregate on grouped Tuple DataSet</a>
+</li>
+<li>
+<a href="#reduce-on-full-dataset">Reduce on full DataSet</a>
+</li>
+<li>
+<a href="#groupreduce-on-full-dataset">GroupReduce on full DataSet</a>
+</li>
+<li>
+<a href="#aggregate-on-full-tuple-dataset">Aggregate on full Tuple DataSet</a>
+</li>
+<li>
+<a href="#join">Join</a>
+<ul>
+<li>
+<a href="#default-join-(join-into-tuple2)">Default Join (Join into Tuple2)</a>
+</li>
+<li>
+<a href="#join-with-joinfunction">Join with JoinFunction</a>
+</li>
+<li>
+<a href="#join-with-projection">Join with Projection</a>
+</li>
+<li>
+<a href="#join-with-dataset-size-hint">Join with DataSet Size Hint</a>
+</li>
+</ul>
+</li>
+<li>
+<a href="#cross">Cross</a>
+<ul>
+<li>
+<a href="#cross-with-user-defined-function">Cross with User-Defined Function</a>
+</li>
+<li>
+<a href="#cross-with-projection">Cross with Projection</a>
+</li>
+<li>
+<a href="#cross-with-dataset-size-hint">Cross with DataSet Size Hint</a>
+</li>
+</ul>
+</li>
+<li>
+<a href="#cogroup">CoGroup</a>
+<ul>
+<li>
+<a href="#cogroup-on-datasets-grouped-by-field-position-keys-(tuple-datasets-only)">CoGroup on DataSets grouped by Field Position Keys (Tuple DataSets only)</a>
+</li>
+<li>
+<a href="#cogroup-on-datasets-grouped-by-key-selector-function">CoGroup on DataSets grouped by Key Selector Function</a>
+</li>
+</ul>
+</li>
+<li>
+<a href="#union">Union</a>
+</li>
+</ul>
+</li>
+<li>
+<a href="#data-sources">Data Sources</a>
+</li>
+<li>
+<a href="#data-sinks">Data Sinks</a>
+</li>
+<li>
+<a href="#debugging">Debugging</a>
+<ul>
+<li>
+<a href="#local-execution-environment">Local Execution Environment</a>
+</li>
+<li>
+<a href="#collection-data-sources-and-sinks">Collection Data Sources and Sinks</a>
+</li>
+</ul>
+</li>
+<li>
+<a href="#iteration-operators">Iteration Operators</a>
+<ul>
+<li>
+<ul>
+<li>
+<a href="#bulk-iterations">Bulk Iterations</a>
+</li>
+<li>
+<a href="#delta-iterations">Delta Iterations</a>
+</li>
+</ul>
+</li>
+</ul>
+</li>
+<li>
+<a href="#semantic-annotations">Semantic Annotations</a>
+</li>
+<li>
+<a href="#broadcast-variables">Broadcast Variables</a>
+</li>
+<li>
+<a href="#program-packaging-&-distributed-execution">Program Packaging &amp; Distributed Execution</a>
+<ul>
+<li>
+<ul>
+<li>
+<a href="#packaging-programs">Packaging Programs</a>
+</li>
+<li>
+<a href="#packaging-programs-through-plans">Packaging Programs through Plans</a>
+</li>
+<li>
+<a href="#summary">Summary</a>
+</li>
+</ul>
+</li>
+</ul>
+</li>
+<li>
+<a href="#accumulators-&-counters">Accumulators &amp; Counters</a>
+</li>
+<li>
+<a href="#execution-plans">Execution Plans</a>
+</li>
+</ul>
+</li>
+</ul>
+
+
+      <p><section id="top"></section></p>
+
+<h1 id="java-api">Java API</h1>
+
+<p><section id="introduction"></p>
+
+<h2 id="introduction">Introduction</h2>
+
+<p>Analysis programs in Stratosphere are regular Java programs that implement transformations on data sets (e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain sources (e.g., by reading files, or from collections). Results are returned via sinks, which may for example write the data to (distributed) files, or to standard output (for example the command line terminal). Stratosphere programs run in a variety of contexts, standalone, or embedded in other programs. The execution can happen in a local JVM, or on clusters of many machines.</p>
+
+<p>In order to create your own Stratosphere program, we encourage you to start with the <a href="#skeleton">program skeleton</a> and gradually add your own <a href="#transformations">transformations</a>. The remaining sections act as references for additional operations and advanced features.</p>
+
+<p><section id="toc"></p>
+
+<div id="docs_05_toc">
+  <div class="list-group">
+
+  </div>
+</div>
+
+<p><section id="example"></p>
+
+<h2 id="example-program">Example Program</h2>
+
+<p>The following program is a complete, working example of WordCount. You can copy &amp; paste the code to run it locally. You only have to include Stratosphere&#39;s Java API library into your project (see Section <a href="#linking">Linking with Stratosphere</a>) and specify the imports. Then you are ready to go!</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCountExample</span> <span class="o">{</span>
+    <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+        <span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
+
+        <span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span>
+            <span class="s">&quot;Who&#39;s there?&quot;</span><span class="o">,</span>
+            <span class="s">&quot;I think I hear them. Stand, ho! Who&#39;s there?&quot;</span><span class="o">);</span>
+
+        <span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">text</span>
+            <span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">LineSplitter</span><span class="o">())</span>
+            <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">aggregate</span><span class="o">(</span><span class="n">Aggregations</span><span class="o">.</span><span class="na">SUM</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span>
+
+        <span class="n">wordCounts</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
+
+        <span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;Word Count Example&quot;</span><span class="o">);</span>
+    <span class="o">}</span>
+
+    <span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">LineSplitter</span> <span class="kd">extends</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+        <span class="nd">@Override</span>
+        <span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">line</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+            <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">line</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">))</span> <span class="o">{</span>
+                <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="n">word</span><span class="o">,</span> <span class="mi">1</span><span class="o">));</span>
+            <span class="o">}</span>
+        <span class="o">}</span>
+    <span class="o">}</span>
+<span class="o">}</span>
+</code></pre></div>
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="linking"></p>
+
+<h2 id="linking-with-stratosphere">Linking with Stratosphere</h2>
+
+<p>To write programs with Stratosphere, you need to include Stratosphere’s Java API library in your project.</p>
+
+<p>The simplest way to do this is to use the <a href="java_api_quickstart.html">quickstart scripts</a>. They create a blank project from a template (a Maven Archetype), which sets up everything for you. To manually create the project, you can use the archetype and create a project by calling:</p>
+<div class="highlight"><pre><code class="language-bash" data-lang="bash">mvn archetype:generate /
+    -DarchetypeGroupId<span class="o">=</span>eu.stratosphere /
+    -DarchetypeArtifactId<span class="o">=</span>quickstart-java /
+    -DarchetypeVersion<span class="o">=</span>0.5.1
+</code></pre></div>
+<p>If you want to add Stratosphere to an existing Maven project, add the following entry to your <em>dependencies</em> section in the <em>pom.xml</em> file of your project:</p>
+<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
+  <span class="nt">&lt;groupId&gt;</span>eu.stratosphere<span class="nt">&lt;/groupId&gt;</span>
+  <span class="nt">&lt;artifactId&gt;</span>stratosphere-java<span class="nt">&lt;/artifactId&gt;</span>
+  <span class="nt">&lt;version&gt;</span>0.5.1<span class="nt">&lt;/version&gt;</span>
+<span class="nt">&lt;/dependency&gt;</span>
+<span class="nt">&lt;dependency&gt;</span>
+  <span class="nt">&lt;groupId&gt;</span>eu.stratosphere<span class="nt">&lt;/groupId&gt;</span>
+  <span class="nt">&lt;artifactId&gt;</span>stratosphere-clients<span class="nt">&lt;/artifactId&gt;</span>
+  <span class="nt">&lt;version&gt;</span>0.5.1<span class="nt">&lt;/version&gt;</span>
+<span class="nt">&lt;/dependency&gt;</span>
+</code></pre></div>
+<p>In order to link against the latest SNAPSHOT versions of the code, please follow <a href="/downloads.html/#nightly">this guide</a>.</p>
+
+<p>The <em>stratosphere-clients</em> dependency is only necessary to invoke the Stratosphere program locally (for example to run it standalone for testing and debugging). 
+If you intend to only export the program as a JAR file and <a href="cluster_execution.html">run it on a cluster</a>, you can skip that dependency.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="skeleton"></p>
+
+<h2 id="program-skeleton">Program Skeleton</h2>
+
+<p>As we already saw in the example, Stratosphere programs look like regular Java
+programs with a <code>main()</code> method. Each program consists of the same basic parts:</p>
+
+<ol>
+<li>Obtain an <code>ExecutionEnvironment</code>,</li>
+<li>Load/create the initial data,</li>
+<li>Specify transformations on this data,</li>
+<li>Specify where to put the results of your computations, and</li>
+<li>Execute your program.</li>
+</ol>
+
+<p>We will now give an overview of each of those steps but please refer
+to the respective sections for more details. Note that all <a href=https://github.com/apache/incubator-flink/blob/master//stratosphere-java/src/main/java/eu/stratosphere/api/java>core classes of the Java API</a> are found in the package <code>eu.stratosphere.api.java</code>.</p>
+
+<p>The <code>ExecutionEnvironment</code> is the basis for all Stratosphere programs. You can
+obtain one using these static methods on class <code>ExecutionEnvironment</code>:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">getExecutionEnvironment</span><span class="o">()</span>
+
+<span class="n">createLocalEnvironment</span><span class="o">()</span>
+<span class="n">createLocalEnvironment</span><span class="o">(</span><span class="kt">int</span> <span class="n">degreeOfParallelism</span><span class="o">)</span>
+
+<span class="n">createRemoteEnvironment</span><span class="o">(</span><span class="n">String</span> <span class="n">host</span><span class="o">,</span> <span class="kt">int</span> <span class="n">port</span><span class="o">,</span> <span class="n">String</span><span class="o">...</span> <span class="n">jarFiles</span><span class="o">)</span>
+<span class="n">createRemoteEnvironment</span><span class="o">(</span><span class="n">String</span> <span class="n">host</span><span class="o">,</span> <span class="kt">int</span> <span class="n">port</span><span class="o">,</span> <span class="kt">int</span> <span class="n">degreeOfParallelism</span><span class="o">,</span> <span class="n">String</span><span class="o">...</span> <span class="n">jarFiles</span><span class="o">)</span>
+</code></pre></div>
+<p>Typically, you only need to use <code>getExecutionEnvironment()</code>, since this
+will do the right thing depending on the context: if you are executing
+your program inside an IDE or as a regular Java program it will create
+a local environment that will execute your program on your local machine. If
+you created a JAR file from you program, and invoke it through the <a href="cli.html">command line</a>
+or the <a href="web_client.html">web interface</a>,
+the Stratosphere cluster manager will
+execute your main method and <code>getExecutionEnvironment()</code> will return
+an execution environment for executing your program on a cluster.</p>
+
+<p>For specifying data sources the execution environment has several methods
+to read from files using various methods: you can just read them line by line,
+as CSV files, or using completely custom data input formats. To just read
+a text file as a sequence of lines, you could use:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">final</span> <span class="n">ExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">ExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
+
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">text</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">readTextFile</span><span class="o">(</span><span class="s">&quot;file:///path/to/file&quot;</span><span class="o">);</span>
+</code></pre></div>
+<p>This will give you a <code>DataSet</code> on which you can then apply transformations. For
+more information on data sources and input formats, please refer to
+<a href="#data_sources">Data Sources</a>.</p>
+
+<p>Once you have a <code>DataSet</code> you can apply transformations to create a new
+<code>DataSet</code> which you can then write to a file, transform again, or
+combine with other <code>DataSet</code>s. You apply transformations by calling
+methods on <code>DataSet</code> with your own custom transformation function. For example,
+map looks like this:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">tokenized</span> <span class="o">=</span> <span class="n">text</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">value</span><span class="o">);</span>
+    <span class="o">}</span>
+<span class="o">});</span>
+</code></pre></div>
+<p>This will create a new <code>DataSet</code> by converting every String in the original
+set to an Integer. For more information and a list of all the transformations,
+please refer to <a href="#transformations">Transformations</a>.</p>
+
+<p>Once you have a <code>DataSet</code> that needs to be written to disk you call one
+of these methods on <code>DataSet</code>:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">writeAsText</span><span class="o">(</span><span class="n">String</span> <span class="n">path</span><span class="o">)</span>
+<span class="n">writeAsCsv</span><span class="o">(</span><span class="n">String</span> <span class="n">path</span><span class="o">)</span>
+<span class="n">write</span><span class="o">(</span><span class="n">FileOutputFormat</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">outputFormat</span><span class="o">,</span> <span class="n">String</span> <span class="n">filePath</span><span class="o">)</span>
+
+<span class="n">print</span><span class="o">()</span>
+</code></pre></div>
+<p>The last method is only useful for developing/debugging on a local machine,
+it will output the contents of the <code>DataSet</code> to standard output. (Note that in
+a cluster, the result goes to the standard out stream of the cluster nodes and ends
+up in the <em>.out</em> files of the workers).
+The first two do as the name suggests, the third one can be used to specify a
+custom data output format. Keep in mind, that these calls do not actually
+write to a file yet. Only when your program is completely specified and you
+call the <code>execute</code> method on your <code>ExecutionEnvironment</code> are all the
+transformations executed and is data written to disk. Please refer
+to <a href="#data_sinks">Data Sinks</a> for more information on writing to files and also
+about custom data output formats.</p>
+
+<p>Once you specified the complete program you need to call <code>execute</code> on
+the <code>ExecutionEnvironment</code>. This will either execute on your local
+machine or submit your program for execution on a cluster, depending on
+how you created the execution environment.
+<a href="#top">Back to top</a></p>
+
+<p><section id="lazyeval"></p>
+
+<h2 id="lazy-evaluation">Lazy Evaluation</h2>
+
+<p>All Stratosphere programs are executed lazily: When the program&#39;s main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to the program&#39;s plan. The operations are actually executed when one of the <code>execute()</code> methods is invoked on the ExecutionEnvironment object. Whether the program is executed locally or on a cluster depends on the environment of the program.</p>
+
+<p>The lazy evaluation lets you construct sophisticated programs that Stratosphere executes as one holistically planned unit.</p>
+
+<p><section id="types"></p>
+
+<h2 id="data-types">Data Types</h2>
+
+<p>The Java API is strongly typed: All data sets and transformations accept typed elements. This catches type errors very early and supports safe refactoring of programs. The API supports various different data types for the input and output of operators. Both <code>DataSet</code> and functions like <code>MapFunction</code>, <code>ReduceFunction</code>, etc. are parameterized with data types using Java generics in order to ensure type-safety.</p>
+
+<p>There are four different categories of data types, which are treated slightly different:</p>
+
+<ol>
+<li><strong>Regular Types</strong></li>
+<li><strong>Tuples</strong></li>
+<li><strong>Values</strong></li>
+<li><strong>Hadoop Writables</strong></li>
+</ol>
+
+<h4 id="regular-types">Regular Types</h4>
+
+<p>Out of the box, the Java API supports all common basic Java types: <code>Byte</code>, <code>Short</code>, <code>Integer</code>, <code>Long</code>, <code>Float</code>, <code>Double</code>, <code>Boolean</code>, <code>Character</code>, <code>String</code>.</p>
+
+<p>Furthermore, you can use the vast majority of custom Java classes. Restrictions apply to classes containing fields that cannot be serialized, like File pointers, I/O streams, or other native resources. Classes that follow the Java Beans conventions work well in general. The following defines a simple example class to illustrate how you can use custom classes:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordWithCount</span> <span class="o">{</span>
+
+    <span class="kd">public</span> <span class="n">String</span> <span class="n">word</span><span class="o">;</span>
+    <span class="kd">public</span> <span class="kt">int</span> <span class="n">count</span><span class="o">;</span>
+
+    <span class="kd">public</span> <span class="nf">WordCount</span><span class="o">()</span> <span class="o">{}</span>
+
+    <span class="kd">public</span> <span class="nf">WordCount</span><span class="o">(</span><span class="n">String</span> <span class="n">word</span><span class="o">,</span> <span class="kt">int</span> <span class="n">count</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">this</span><span class="o">.</span><span class="na">word</span> <span class="o">=</span> <span class="n">word</span><span class="o">;</span>
+        <span class="k">this</span><span class="o">.</span><span class="na">count</span> <span class="o">=</span> <span class="n">count</span><span class="o">;</span>
+    <span class="o">}</span>
+<span class="o">}</span>
+</code></pre></div>
+<p>You can use all of those types to parameterize <code>DataSet</code> and function implementations, e.g. <code>DataSet&lt;String&gt;</code> for a <code>String</code> data set or <code>MapFunction&lt;String, Integer&gt;</code> for a mapper from <code>String</code> to <code>Integer</code>.</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// using a basic data type</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">numbers</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span><span class="s">&quot;1&quot;</span><span class="o">,</span> <span class="s">&quot;2&quot;</span><span class="o">);</span>
+
+<span class="n">numbers</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">String</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">Integer</span><span class="o">.</span><span class="na">parseInt</span><span class="o">(</span><span class="n">value</span><span class="o">);</span>
+    <span class="o">}</span>
+<span class="o">});</span>
+
+<span class="c1">// using a custom class</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">WordCount</span><span class="o">&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span>
+    <span class="k">new</span> <span class="nf">WordCount</span><span class="o">(</span><span class="s">&quot;hello&quot;</span><span class="o">,</span> <span class="mi">1</span><span class="o">),</span>
+    <span class="k">new</span> <span class="nf">WordCount</span><span class="o">(</span><span class="s">&quot;world&quot;</span><span class="o">,</span> <span class="mi">2</span><span class="o">));</span>
+
+<span class="n">wordCounts</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">WordCount</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">String</span> <span class="nf">map</span><span class="o">(</span><span class="n">WordCount</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">value</span><span class="o">.</span><span class="na">count</span><span class="o">;</span>
+    <span class="o">}</span>
+<span class="o">});</span>
+</code></pre></div>
+<p>When working with operators that require a Key for grouping or matching records
+you need to implement a <code>KeySelector</code> for your custom type (see
+<a href="#transformations">Section Data Transformations</a>).</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">wordCounts</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="k">new</span> <span class="n">KeySelector</span><span class="o">&lt;</span><span class="n">WordCount</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+    <span class="kd">public</span> <span class="n">String</span> <span class="nf">getKey</span><span class="o">(</span><span class="n">WordCount</span> <span class="n">v</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">v</span><span class="o">.</span><span class="na">word</span><span class="o">;</span>
+    <span class="o">}</span>
+<span class="o">}).</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nf">MyReduceFunction</span><span class="o">());</span>
+</code></pre></div>
+<h4 id="tuples">Tuples</h4>
+
+<p>You can use the <code>Tuple</code> classes for composite types. Tuples contain a fix number of fields of various types. The Java API provides classes from <code>Tuple1</code> up to <code>Tuple25</code>. Every field of a tuple can be an arbitrary Stratosphere type - including further tuples, resulting in nested tuples. Fields of a Tuple can be accessed directly using the fields <code>tuple.f4</code>, or using the generic getter method <code>tuple.getField(int position)</code>. The field numbering starts with 0. Note that this stands in contrast to the Scala tuples, but it is more consistent with Java&#39;s general indexing.</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">fromElements</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="s">&quot;hello&quot;</span><span class="o">,</span> <span class="mi">1</span><span class="o">),</span>
+    <span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="s">&quot;world&quot;</span><span class="o">,</span> <span class="mi">2</span><span class="o">));</span>
+
+<span class="n">wordCounts</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;,</span> <span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">String</span> <span class="nf">map</span><span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">value</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
+    <span class="o">}</span>
+<span class="o">});</span>
+</code></pre></div>
+<p>When working with operators that require a Key for grouping or matching records,
+Tuples let you simply specify the positions of the fields to be used as key. You can specify more
+than one position to use composite keys (see <a href="#transformations">Section Data Transformations</a>).</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">wordCounts</span>
+    <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nf">MyReduceFunction</span><span class="o">());</span>
+</code></pre></div>
+<p>In order to access fields more intuitively and to generate more readable code, it is also possible to extend a subclass of <code>Tuple</code>. You can add getters and setters with custom names that delegate to the field positions. See this <a href=https://github.com/apache/incubator-flink/blob/master//stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/relational/TPCHQuery3.java>example</a> for an illustration how to make use of that mechanism.</p>
+
+<h4 id="values">Values</h4>
+
+<p><em>Value</em> types describe their serialization and deserialization manually. Instead of going through a general purpose serialization framework, they provide custom code for those operations by means implementing the <code>eu.stratosphere.types.Value</code> interface with the methods <code>read</code> and <code>write</code>. Using a <em>Value</em> type is reasonable when general purpose serialization would be highly inefficient. An example would be a data type that implements a sparse vector of elements as an array. Knowing that the array is mostly zero, one can use a special encoding for the non-zero elements, while the general purpose serialization would simply write all array elements.</p>
+
+<p>The <code>eu.stratosphere.types.CopyableValue</code> interface supports manual internal cloning logic in a similar way.</p>
+
+<p>Stratosphere comes with pre-defined Value types that correspond to Java&#39;s basic data types. (<code>ByteValue</code>, <code>ShortValue</code>, <code>IntValue</code>, <code>LongValue</code>, <code>FloatValue</code>, <code>DoubleValue</code>, <code>StringValue</code>, <code>CharValue</code>, <code>BooleanValue</code>). These Value types act as mutable variants of the basic data types: Their value can be altered, allowing programmers to reuse objects and take pressure off the garbage collector. </p>
+
+<h4 id="hadoop-writables">Hadoop Writables</h4>
+
+<p>You can use types that implement the <code>org.apache.hadoop.Writable</code> interface. The serialization logic defined in the <code>write()</code>and <code>readFields()</code> methods will be used for serialization.</p>
+
+<h4 id="type-erasure-&amp;-type-inferrence">Type Erasure &amp; Type Inferrence</h4>
+
+<p>The Java compiler throws away much of the generic type information after the compilation. This is known as <em>type erasure</em> in Java. It means that at runtime, an instance of an object does not know its generic type any more. For example, instances of <code>DataSet&lt;String&gt;</code> and <code>DataSet&lt;Long&gt;</code> look the same to the JVM.</p>
+
+<p>Stratosphere requires type information at the time when it prepares the program for execution (when the main method of the program is called). The Stratosphere Java API tries to reconstruct the type information that was thrown away in various ways and store it explicitly in the data sets and operators. You can retrieve the type via <code>DataSet.getType()</code>. The method returns an instance of <code>TypeInformation</code>, which is Stratosphere&#39;s internal way of representing types.</p>
+
+<p>The type inference has its limits and needs the &quot;cooperation&quot; of the programmer in some cases. Examples for that are methods that create data sets from collections, such as <code>ExecutionEnvironment.fromCollection(),</code> where you can pass an argument that describes the type. But also generic functions like <code>MapFunction&lt;I, O&gt;</code> may need extra type information.</p>
+
+<p>The <a href=https://github.com/apache/incubator-flink/blob/master//stratosphere-java/src/main/java/eu/stratosphere/api/java/typeutils/ResultTypeQueryable.java>ResultTypeQueryable</a> interface can be implemented by input formats and functions to tell the API explicitly about their return type. The <em>input types</em> that the functions are invoked with can usually be inferred by the result types of the previous operations.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="transformations"></p>
+
+<h2 id="data-transformations">Data Transformations</h2>
+
+<p>A data transformation transforms one or more <code>DataSet</code>s into a new <code>DataSet</code>. Advanced data analysis programs can be assembled by chaining multiple transformations.</p>
+
+<h3 id="map">Map</h3>
+
+<p>The Map transformation applies a user-defined <code>MapFunction</code> on each element of a DataSet.
+It implements a one-to-one mapping, that is, exactly one element must be returned by
+the function.</p>
+
+<p>The following code transforms a <code>DataSet</code> of Integer pairs into a <code>DataSet</code> of Integers:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// MapFunction that adds two integer values</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">IntAdder</span> <span class="kd">extends</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">map</span><span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">return</span> <span class="n">in</span><span class="o">.</span><span class="na">f0</span> <span class="o">+</span> <span class="n">in</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">intPairs</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">intSums</span> <span class="o">=</span> <span class="n">intPairs</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="nf">IntAdder</span><span class="o">());</span>
+</code></pre></div>
+<h3 id="flatmap">FlatMap</h3>
+
+<p>The FlatMap transformation applies a user-defined <code>FlatMapFunction</code> on each element of a <code>DataSet</code>.
+This variant of a map function can return arbitrary many result elements (including none) for each input element.</p>
+
+<p>The following code transforms a <code>DataSet</code> of text lines into a <code>DataSet</code> of words:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">Tokenizer</span> <span class="kd">extends</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">token</span> <span class="o">:</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\W&quot;</span><span class="o">))</span> <span class="o">{</span>
+      <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">token</span><span class="o">);</span>
+    <span class="o">}</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">textLines</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="n">textLines</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">());</span>
+</code></pre></div>
+<h3 id="filter">Filter</h3>
+
+<p>The Filter transformation applies a user-defined <code>FilterFunction</code> on each element of a <code>DataSet</code> and retains only those elements for which the function returns <code>true</code>.</p>
+
+<p>The following code removes all Integers smaller than zero from a <code>DataSet</code>:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// FilterFunction that filters out all Integers smaller than zero.</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">NaturalNumberFilter</span> <span class="kd">extends</span> <span class="n">FilterFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">Integer</span> <span class="n">number</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">return</span> <span class="n">number</span> <span class="o">&gt;=</span> <span class="mi">0</span><span class="o">;</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">intNumbers</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">naturalNumbers</span> <span class="o">=</span> <span class="n">intNumbers</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="k">new</span> <span class="nf">NaturalNumberFilter</span><span class="o">());</span>
+</code></pre></div>
+<h3 id="project-(tuple-datasets-only)">Project (Tuple DataSets only)</h3>
+
+<p>The Project transformation removes or moves <code>Tuple</code> fields of a <code>Tuple</code> <code>DataSet</code>.
+The <code>project(int...)</code> method selects <code>Tuple</code> fields that should be retained by their index and defines their order in the output <code>Tuple</code>.
+The <code>types(Class&lt;?&gt; ...)</code>method must give the types of the output <code>Tuple</code> fields.</p>
+
+<p>Projections do not require the definition of a user function.</p>
+
+<p>The following code shows different ways to apply a Project transformation on a <code>DataSet</code>:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">in</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="c1">// converts Tuple3&lt;Integer, Double, String&gt; into Tuple2&lt;String, Integer&gt;</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">out</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">project</span><span class="o">(</span><span class="mi">2</span><span class="o">,</span><span class="mi">0</span><span class="o">).</span><span class="na">types</span><span class="o">(</span><span class="n">String</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">Integer</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
+</code></pre></div>
+<h3 id="transformations-on-grouped-dataset">Transformations on grouped DataSet</h3>
+
+<p>The reduce operations can operate on grouped data sets. Specifying the key to
+be used for grouping can be done in two ways:</p>
+
+<ul>
+<li>a <code>KeySelector</code> function or</li>
+<li>one or more field position keys (<code>Tuple</code> <code>DataSet</code> only).</li>
+</ul>
+
+<p>Please look at the reduce examples to see how the grouping keys are specified.</p>
+
+<h3 id="reduce-on-grouped-dataset">Reduce on grouped DataSet</h3>
+
+<p>A Reduce transformation that is applied on a grouped <code>DataSet</code> reduces each group to a single element using a user-defined <code>ReduceFunction</code>.
+For each group of input elements, a <code>ReduceFunction</code> successively combines pairs of elements into one element until only a single element for each group remains.</p>
+
+<h4 id="reduce-on-dataset-grouped-by-keyselector-function">Reduce on DataSet grouped by KeySelector Function</h4>
+
+<p>A <code>KeySelector</code> function extracts a key value from each element of a <code>DataSet</code>. The extracted key value is used to group the <code>DataSet</code>.
+The following code shows how to group a POJO <code>DataSet</code> using a <code>KeySelector</code> function and to reduce it with a <code>ReduceFunction</code>.</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// some ordinary POJO</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">WC</span> <span class="o">{</span>
+  <span class="kd">public</span> <span class="n">String</span> <span class="n">word</span><span class="o">;</span>
+  <span class="kd">public</span> <span class="kt">int</span> <span class="n">count</span><span class="o">;</span>
+  <span class="c1">// [...]</span>
+<span class="o">}</span>
+
+<span class="c1">// ReduceFunction that sums Integer attributes of a POJO</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCounter</span> <span class="kd">extends</span> <span class="n">ReduceFunction</span><span class="o">&lt;</span><span class="n">WC</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">WC</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">WC</span> <span class="n">in1</span><span class="o">,</span> <span class="n">WC</span> <span class="n">in2</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">return</span> <span class="k">new</span> <span class="nf">WC</span><span class="o">(</span><span class="n">in1</span><span class="o">.</span><span class="na">word</span><span class="o">,</span> <span class="n">in1</span><span class="o">.</span><span class="na">count</span> <span class="o">+</span> <span class="n">in2</span><span class="o">.</span><span class="na">count</span><span class="o">);</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">WC</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">WC</span><span class="o">&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">words</span>
+                         <span class="c1">// DataSet grouping with inline-defined KeySelector function</span>
+                         <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span>
+                           <span class="k">new</span> <span class="n">KeySelector</span><span class="o">&lt;</span><span class="n">WC</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+                             <span class="kd">public</span> <span class="n">String</span> <span class="nf">getKey</span><span class="o">(</span><span class="n">WC</span> <span class="n">wc</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">wc</span><span class="o">.</span><span class="na">word</span><span class="o">;</span> <span class="o">}</span>
+                           <span class="o">})</span>
+                         <span class="c1">// apply ReduceFunction on grouped DataSet</span>
+                         <span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nf">WordCounter</span><span class="o">());</span>
+</code></pre></div>
+<h4 id="reduce-on-dataset-grouped-by-field-position-keys-(tuple-datasets-only)">Reduce on DataSet grouped by Field Position Keys (Tuple DataSets only)</h4>
+
+<p>Field position keys specify one or more fields of a <code>Tuple</code> <code>DataSet</code> that are used as grouping keys.
+The following code shows how to use field position keys and apply a <code>ReduceFunction</code>.</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">tuples</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">reducedTuples</span> <span class="o">=</span>
+                                         <span class="n">tuples</span>
+                                         <span class="c1">// group DataSet on first and second field of Tuple</span>
+                                         <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span><span class="mi">1</span><span class="o">)</span>
+                                         <span class="c1">// apply ReduceFunction on grouped DataSet</span>
+                                         <span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nf">MyTupleReducer</span><span class="o">());</span>
+</code></pre></div>
+<h3 id="groupreduce-on-grouped-dataset">GroupReduce on grouped DataSet</h3>
+
+<p>A GroupReduce transformation that is applied on a grouped <code>DataSet</code> calls a user-defined <code>GroupReduceFunction</code> for each group. The difference
+between this and <code>Reduce</code> is that the user defined function gets the whole group at once.
+The function is invoked with an iterator over all elements of a group and can return an arbitrary number of result elements using the collector.</p>
+
+<h4 id="groupreduce-on-dataset-grouped-by-field-position-keys-(tuple-datasets-only)">GroupReduce on DataSet grouped by Field Position Keys (Tuple DataSets only)</h4>
+
+<p>The following code shows how duplicate strings can be removed from a <code>DataSet</code> grouped by Integer.</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">DistinctReduce</span>
+         <span class="kd">extends</span> <span class="n">GroupReduceFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+  <span class="c1">// Set to hold all unique strings of a group</span>
+  <span class="n">Set</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">uniqStrings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;();</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterator</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">in</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+    <span class="c1">// clear set</span>
+    <span class="n">uniqStrings</span><span class="o">.</span><span class="na">clear</span><span class="o">();</span>
+    <span class="c1">// there is at least one element in the iterator</span>
+    <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">first</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">next</span><span class="o">();</span>
+    <span class="n">Integer</span> <span class="n">key</span> <span class="o">=</span> <span class="n">first</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span>
+    <span class="n">uniqStrings</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">first</span><span class="o">.</span><span class="na">f1</span><span class="o">);</span>
+    <span class="c1">// add all strings of the group to the set</span>
+    <span class="k">while</span><span class="o">(</span><span class="n">in</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
+      <span class="n">uniqStrings</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">in</span><span class="o">.</span><span class="na">next</span><span class="o">().</span><span class="na">f1</span><span class="o">);</span>
+    <span class="o">}</span>
+    <span class="c1">// emit all unique strings</span>
+    <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">t</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;(</span><span class="n">key</span><span class="o">,</span> <span class="s">&quot;&quot;</span><span class="o">);</span>
+    <span class="k">for</span><span class="o">(</span><span class="n">String</span> <span class="n">s</span> <span class="o">:</span> <span class="n">uniqStrings</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">t</span><span class="o">.</span><span class="na">f1</span> <span class="o">=</span> <span class="n">s</span><span class="o">;</span>
+      <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">t</span><span class="o">);</span>
+    <span class="o">}</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">output</span> <span class="o">=</span>
+                                 <span class="n">input</span>
+                                 <span class="c1">// group DataSet by the first tuple field</span>
+                                 <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
+                                 <span class="c1">// apply GroupReduceFunction on each group and</span>
+                                 <span class="c1">//   remove elements with duplicate strings.</span>
+                                 <span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nf">DistinctReduce</span><span class="o">());</span>
+</code></pre></div>
+<p><strong>Note:</strong> Stratosphere internally works a lot with mutable objects. Collecting objects like in the above example only works because Strings are immutable in Java!</p>
+
+<h4 id="groupreduce-on-dataset-grouped-by-keyselector-function">GroupReduce on DataSet grouped by KeySelector Function</h4>
+
+<p>Works analogous to <code>KeySelector</code> functions in Reduce transformations.</p>
+
+<h4 id="groupreduce-on-sorted-groups-(tuple-datasets-only)">GroupReduce on sorted groups (Tuple DataSets only)</h4>
+
+<p>A <code>GroupReduceFunction</code> accesses the elements of a group using an iterator. Optionally, the iterator can hand out the elements of a group in a specified order. In many cases this can help to reduce the complexity of a user-defined <code>GroupReduceFunction</code> and improve its efficiency.
+Right now, this feature is only available for <code>Tuple</code> <code>DataSet</code>.</p>
+
+<p>The following code shows another example how to remove duplicate Strings in a <code>DataSet</code> grouped by an Integer and sorted by String.</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// GroupReduceFunction that removes consecutive identical elements</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">DistinctReduce</span>
+         <span class="kd">extends</span> <span class="n">GroupReduceFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterator</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">in</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+    <span class="c1">// there is at least one element in the iterator</span>
+    <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">first</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">next</span><span class="o">();</span>
+    <span class="n">Integer</span> <span class="n">key</span> <span class="o">=</span> <span class="n">first</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span>
+    <span class="n">String</span> <span class="n">comp</span> <span class="o">=</span> <span class="n">first</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
+    <span class="c1">// for each element in group</span>
+    <span class="k">while</span><span class="o">(</span><span class="n">in</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
+      <span class="n">String</span> <span class="n">next</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">next</span><span class="o">().</span><span class="na">f1</span><span class="o">;</span>
+      <span class="c1">// check if strings are different</span>
+      <span class="k">if</span><span class="o">(!</span><span class="n">next</span><span class="o">.</span><span class="na">equals</span><span class="o">(</span><span class="n">comp</span><span class="o">))</span> <span class="o">{</span>
+        <span class="c1">// emit a new element</span>
+        <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;(</span><span class="n">key</span><span class="o">,</span> <span class="n">comp</span><span class="o">));</span>
+        <span class="c1">// update compare string</span>
+        <span class="n">comp</span> <span class="o">=</span> <span class="n">next</span><span class="o">;</span>
+      <span class="o">}</span>
+    <span class="o">}</span>
+    <span class="c1">// emit last element</span>
+    <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;(</span><span class="n">key</span><span class="o">,</span> <span class="n">comp</span><span class="o">));</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span>
+                         <span class="c1">// group DataSet by the first tuple field</span>
+                         <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
+                         <span class="c1">// sort groups on second tuple field</span>
+                         <span class="o">.</span><span class="na">sortGroup</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">Order</span><span class="o">.</span><span class="na">ASCENDING</span><span class="o">)</span>
+                         <span class="c1">// // apply GroupReduceFunction on DataSet with sorted groups</span>
+                         <span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nf">DistinctReduce</span><span class="o">());</span>
+</code></pre></div>
+<p><strong>Note:</strong> A GroupSort often comes for free if the grouping is established using a sort-based execution strategy of an operator before the reduce operation.</p>
+
+<h4 id="combinable-groupreducefunctions">Combinable GroupReduceFunctions</h4>
+
+<p>In contrast to a <code>ReduceFunction</code>, a <code>GroupReduceFunction</code> is not implicitly combinable. In order to make a <code>GroupReduceFunction</code> combinable, you need to implement (override) the <code>combine()</code> method and annotate the <code>GroupReduceFunction</code> with the <code>@Combinable</code> annotation as shown here:</p>
+
+<p>The following code shows how to compute multiple sums using a combinable <code>GroupReduceFunction</code>:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Combinable GroupReduceFunction that computes two sums.</span>
+<span class="nd">@Combinable</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCombinableGroupReducer</span>
+         <span class="kd">extends</span> <span class="n">GroupReduceFunction</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;,</span>
+                                     <span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterator</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">in</span><span class="o">,</span>
+                     <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+    <span class="c1">// one element is always present in iterator</span>
+    <span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">curr</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">next</span><span class="o">();</span>
+    <span class="n">String</span> <span class="n">key</span> <span class="o">=</span> <span class="n">curr</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span>
+    <span class="kt">int</span> <span class="n">intSum</span> <span class="o">=</span> <span class="n">curr</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
+    <span class="kt">double</span> <span class="n">doubleSum</span> <span class="o">=</span> <span class="n">curr</span><span class="o">.</span><span class="na">f2</span><span class="o">;</span>
+    <span class="c1">// sum up all ints and doubles</span>
+    <span class="k">while</span><span class="o">(</span><span class="n">in</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
+      <span class="n">curr</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">next</span><span class="o">();</span>
+      <span class="n">intSum</span> <span class="o">+=</span> <span class="n">curr</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
+      <span class="n">doubleSum</span> <span class="o">+=</span> <span class="n">curr</span><span class="o">.</span><span class="na">f2</span><span class="o">;</span>
+    <span class="o">}</span>
+    <span class="c1">// emit a tuple with both sums</span>
+    <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;(</span><span class="n">key</span><span class="o">,</span> <span class="n">intSum</span><span class="o">,</span> <span class="n">doubleSum</span><span class="o">));</span>
+  <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">combine</span><span class="o">(</span><span class="n">Iterator</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">in</span><span class="o">,</span>
+                      <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">))</span> <span class="o">{</span>
+    <span class="c1">// in some cases combine() calls can simply be forwarded to reduce().</span>
+    <span class="k">this</span><span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="n">in</span><span class="o">,</span> <span class="n">out</span><span class="o">);</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+</code></pre></div>
+<h3 id="aggregate-on-grouped-tuple-dataset">Aggregate on grouped Tuple DataSet</h3>
+
+<p>There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:</p>
+
+<ul>
+<li>Sum,</li>
+<li>Min, and</li>
+<li>Max.</li>
+</ul>
+
+<p>The Aggregate transformation can only be applied on a <code>Tuple</code> <code>DataSet</code> and supports only field positions keys for grouping.</p>
+
+<p>The following code shows how to apply an Aggregation transformation on a <code>DataSet</code> grouped by field position keys:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span>
+                                          <span class="c1">// group DataSet on second field</span>
+                                          <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
+                                          <span class="c1">// compute sum of the first field</span>
+                                          <span class="o">.</span><span class="na">aggregate</span><span class="o">(</span><span class="n">SUM</span><span class="o">,</span> <span class="mi">0</span><span class="o">)</span>
+                                          <span class="c1">// compute minimum of the third field</span>
+                                          <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">MIN</span><span class="o">,</span> <span class="mi">2</span><span class="o">);</span>
+</code></pre></div>
+<p>To apply multiple aggregations on a DataSet it is necessary to use the <code>.and()</code> function after the first aggregate, that means <code>.aggregate(SUM, 0).and(MIN, 2)</code> produces the sum of field 0 and the minimum of field 2 of the original DataSet. 
+In contrast to that <code>.aggregate(SUM, 0).aggregate(MIN, 2)</code> will apply an aggregation on an aggregation. In the given example it would produce the minimum of field 2 after calculating the sum of field 0 grouped by field 1.</p>
+
+<p><strong>Note:</strong> The set of aggregation functions will be extended in the future.</p>
+
+<h3 id="reduce-on-full-dataset">Reduce on full DataSet</h3>
+
+<p>The Reduce transformation applies a user-defined <code>ReduceFunction</code> to all elements of a <code>DataSet</code>.
+The <code>ReduceFunction</code> subsequently combines pairs of elements into one element until only a single element remains.</p>
+
+<p>The following code shows how to sum all elements of an Integer <code>DataSet</code>:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// ReduceFunction that sums Integers</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">IntSummer</span> <span class="kd">extends</span> <span class="n">ReduceFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Integer</span> <span class="n">num1</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">num2</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">return</span> <span class="n">num1</span> <span class="o">+</span> <span class="n">num2</span><span class="o">;</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">intNumbers</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">sum</span> <span class="o">=</span> <span class="n">intNumbers</span><span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nf">IntSummer</span><span class="o">());</span>
+</code></pre></div>
+<p>Reducing a full <code>DataSet</code> using the Reduce transformation implies that the final Reduce operation cannot be done in parallel. However, a <code>ReduceFunction</code> is automatically combinable such that a Reduce transformation does not limit scalability for most use cases.</p>
+
+<h3 id="groupreduce-on-full-dataset">GroupReduce on full DataSet</h3>
+
+<p>The GroupReduce transformation applies a user-defined <code>GroupReduceFunction</code> on all elements of a <code>DataSet</code>.
+A <code>GroupReduceFunction</code> can iterate over all elements of <code>DataSet</code> and return an arbitrary number of result elements.</p>
+
+<p>The following example shows how to apply a GroupReduce transformation on a full <code>DataSet</code>:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="c1">// apply a (preferably combinable) GroupReduceFunction to a DataSet</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span><span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nf">MyGroupReducer</span><span class="o">());</span>
+</code></pre></div>
+<p><strong>Note:</strong> A GroupReduce transformation on a full <code>DataSet</code> cannot be done in parallel if the <code>GroupReduceFunction</code> is not combinable. Therefore, this can be a very compute intensive operation. See the paragraph on &quot;Combineable <code>GroupReduceFunction</code>s&quot; above to learn how to implement a combinable <code>GroupReduceFunction</code>.</p>
+
+<h3 id="aggregate-on-full-tuple-dataset">Aggregate on full Tuple DataSet</h3>
+
+<p>There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:</p>
+
+<ul>
+<li>Sum,</li>
+<li>Min, and</li>
+<li>Max.</li>
+</ul>
+
+<p>The Aggregate transformation can only be applied on a <code>Tuple</code> <code>DataSet</code>.</p>
+
+<p>The following code shows how to apply an Aggregation transformation on a full <code>DataSet</code>:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span>
+                                          <span class="c1">// compute sum of the first field</span>
+                                          <span class="o">.</span><span class="na">aggregate</span><span class="o">(</span><span class="n">SUM</span><span class="o">,</span> <span class="mi">0</span><span class="o">)</span>
+                                          <span class="c1">// compute minimum of the second field</span>
+                                          <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">MIN</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span>
+</code></pre></div>
+<p><strong>Note:</strong> Extending the set of supported aggregation functions is on our roadmap.</p>
+
+<h3 id="join">Join</h3>
+
+<p>The Join transformation joins two <code>DataSet</code>s into one <code>DataSet</code>. The elements of both <code>DataSet</code>s are joined on one or more keys which can be specified using</p>
+
+<ul>
+<li>a <code>KeySelector</code> function or</li>
+<li>one or more field position keys (<code>Tuple</code> <code>DataSet</code> only).</li>
+</ul>
+
+<p>There are a few different ways to perform a Join transformation which are shown in the following.</p>
+
+<h4 id="default-join-(join-into-tuple2)">Default Join (Join into Tuple2)</h4>
+
+<p>The default Join transformation produces a new <code>Tuple</code><code>DataSet</code> with two fields. Each tuple holds a joined element of the first input <code>DataSet</code> in the first tuple field and a matching element of the second input <code>DataSet</code> in the second field.</p>
+
+<p>The following code shows a default Join transformation using field position keys:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">input1</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">input2</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="c1">// result dataset is typed as Tuple2</span>

[... 721 lines stripped ...]