You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/06/30 12:16:18 UTC

[48/51] [partial] flink-web git commit: [hotfix] Manual build of docs

http://git-wip-us.apache.org/repos/asf/flink-web/blob/396616d4/content/docs/0.9/apis/dataset_transformations.html
----------------------------------------------------------------------
diff --git a/content/docs/0.9/apis/dataset_transformations.html b/content/docs/0.9/apis/dataset_transformations.html
new file mode 100644
index 0000000..5247ebc
--- /dev/null
+++ b/content/docs/0.9/apis/dataset_transformations.html
@@ -0,0 +1,1720 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<!DOCTYPE html>
+
+<html lang="en">
+  <head>
+    <meta charset="utf-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1">
+    <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
+    
+    <title>Apache Flink 0.9.0 Documentation: DataSet Transformations</title>
+    
+    <link rel="shortcut icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon">
+    <link rel="icon" href="http://flink.apache.org/docs/0.9/page/favicon.ico" type="image/x-icon">
+
+    <!-- Bootstrap -->
+    <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
+    <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/flink.css">
+    <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/syntax.css">
+    <link rel="stylesheet" href="http://flink.apache.org/docs/0.9/page/css/codetabs.css">
+    
+    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
+    <!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
+    <!--[if lt IE 9]>
+      <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
+      <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
+    <![endif]-->
+  </head>
+  <body>
+    
+    
+
+
+
+
+    <!-- Top navbar. -->
+    <nav class="navbar navbar-default navbar-fixed-top">
+      <div class="container">
+        <!-- The logo. -->
+        <div class="navbar-header">
+          <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
+            <span class="icon-bar"></span>
+            <span class="icon-bar"></span>
+            <span class="icon-bar"></span>
+          </button>
+          <div class="navbar-logo">
+            <a href="http://flink.apache.org"><img alt="Apache Flink" src="http://flink.apache.org/docs/0.9/page/img/navbar-brand-logo.jpg"></a>
+          </div>
+        </div><!-- /.navbar-header -->
+
+        <!-- The navigation links. -->
+        <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
+          <ul class="nav navbar-nav">
+            <li><a href="http://flink.apache.org/docs/0.9/index.html">Overview<span class="hidden-sm hidden-xs"> 0.9.0</span></a></li>
+
+            <!-- Setup -->
+            <li class="dropdown">
+              <a href="http://flink.apache.org/docs/0.9/setup" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Setup <span class="caret"></span></a>
+              <ul class="dropdown-menu" role="menu">
+                <li><a href="http://flink.apache.org/docs/0.9/setup/building.html">Get Flink 0.9-SNAPSHOT</a></li>
+
+                <li class="divider"></li>
+                <li role="presentation" class="dropdown-header"><strong>Deployment</strong></li>
+                <li><a href="http://flink.apache.org/docs/0.9/setup/local_setup.html" class="active">Local</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/setup/cluster_setup.html">Cluster (Standalone)</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/setup/yarn_setup.html">YARN</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/setup/gce_setup.html">GCloud</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/setup/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li>
+
+                <li class="divider"></li>
+                <li><a href="http://flink.apache.org/docs/0.9/setup/config.html">Configuration</a></li>
+              </ul>
+            </li>
+
+            <!-- Programming Guides -->
+            <li class="dropdown">
+              <a href="http://flink.apache.org/docs/0.9/apis" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Programming Guides <span class="caret"></span></a>
+              <ul class="dropdown-menu" role="menu">
+                <li><a href="http://flink.apache.org/docs/0.9/apis/programming_guide.html"><strong>Batch: DataSet API</strong></a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/apis/streaming_guide.html"><strong>Streaming: DataStream API</strong> <span class="badge">Beta</span></a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/apis/python.html">Python API <span class="badge">Beta</span></a></li>
+
+                <li class="divider"></li>
+                <li><a href="scala_shell.html">Interactive Scala Shell</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/apis/dataset_transformations.html">Dataset Transformations</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/apis/best_practices.html">Best Practices</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/apis/example_connectors.html">Connectors</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/apis/examples.html">Examples</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/apis/local_execution.html">Local Execution</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/apis/cluster_execution.html">Cluster Execution</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/apis/cli.html">Command Line Interface</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/apis/web_client.html">Web Client</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/apis/iterations.html">Iterations</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/apis/java8.html">Java 8</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/apis/hadoop_compatibility.html">Hadoop Compatability <span class="badge">Beta</span></a></li>
+              </ul>
+            </li>
+
+            <!-- Libraries -->
+            <li class="dropdown">
+              <a href="http://flink.apache.org/docs/0.9/libs" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Libraries <span class="caret"></span></a>
+                <ul class="dropdown-menu" role="menu">
+                  <li><a href="http://flink.apache.org/docs/0.9/libs/spargel_guide.html">Graphs: Spargel</a></li>
+                  <li><a href="http://flink.apache.org/docs/0.9/libs/gelly_guide.html">Graphs: Gelly <span class="badge">Beta</span></a></li>
+                  <li><a href="http://flink.apache.org/docs/0.9/libs/ml/">Machine Learning <span class="badge">Beta</span></a></li>
+                  <li><a href="http://flink.apache.org/docs/0.9/libs/table.html">Relational: Table <span class="badge">Beta</span></a></li>
+              </ul>
+            </li>
+
+            <!-- Internals -->
+            <li class="dropdown">
+              <a href="http://flink.apache.org/docs/0.9/internals" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Internals <span class="caret"></span></a>
+              <ul class="dropdown-menu" role="menu">
+                <li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
+                <li><a href="http://flink.apache.org/docs/0.9/internals/how_to_contribute.html">How to Contribute</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/internals/coding_guidelines.html">Coding Guidelines</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/internals/ide_setup.html">IDE Setup</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/internals/logging.html">Logging</a></li>
+                <li class="divider"></li>
+                <li role="presentation" class="dropdown-header"><strong>Internals</strong></li>
+                <li><a href="http://flink.apache.org/docs/0.9/internals/general_arch.html">Architecture &amp; Process Model</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/internals/types_serialization.html">Type Extraction &amp; Serialization</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/internals/job_scheduling.html">Jobs &amp; Scheduling</a></li>
+                <li><a href="http://flink.apache.org/docs/0.9/internals/add_operator.html">How-To: Add an Operator</a></li>
+              </ul>
+            </li>
+          </ul>
+          <form class="navbar-form navbar-right hidden-sm hidden-md" role="search" action="http://flink.apache.org/docs/0.9/search-results.html">
+            <div class="form-group">
+              <input type="text" class="form-control" name="q" placeholder="Search all pages">
+            </div>
+            <button type="submit" class="btn btn-default">Search</button>
+          </form>
+        </div><!-- /.navbar-collapse -->
+      </div><!-- /.container -->
+    </nav>
+
+
+    
+
+    <!-- Main content. -->
+    <div class="container">
+      
+      
+<div class="row">
+  <div class="col-sm-10 col-sm-offset-1">
+    <h1>DataSet Transformations</h1>
+
+
+
+<p>This document gives a deep-dive into the available transformations on DataSets. For a general introduction to the
+Flink Java API, please refer to the <a href="programming_guide.html">Programming Guide</a>.</p>
+
+<ul id="markdown-toc">
+  <li><a href="#map" id="markdown-toc-map">Map</a></li>
+  <li><a href="#flatmap" id="markdown-toc-flatmap">FlatMap</a></li>
+  <li><a href="#mappartition" id="markdown-toc-mappartition">MapPartition</a></li>
+  <li><a href="#filter" id="markdown-toc-filter">Filter</a></li>
+  <li><a href="#project-tuple-datasets-only-javapython-api-only" id="markdown-toc-project-tuple-datasets-only-javapython-api-only">Project (Tuple DataSets only) (Java/Python API Only)</a></li>
+  <li><a href="#transformations-on-grouped-dataset" id="markdown-toc-transformations-on-grouped-dataset">Transformations on Grouped DataSet</a></li>
+  <li><a href="#transformations-on-grouped-dataset-1" id="markdown-toc-transformations-on-grouped-dataset-1">Transformations on Grouped DataSet</a></li>
+  <li><a href="#reduce-on-grouped-dataset" id="markdown-toc-reduce-on-grouped-dataset">Reduce on Grouped DataSet</a></li>
+  <li><a href="#groupreduce-on-grouped-dataset" id="markdown-toc-groupreduce-on-grouped-dataset">GroupReduce on Grouped DataSet</a></li>
+  <li><a href="#groupcombine-on-a-grouped-dataset" id="markdown-toc-groupcombine-on-a-grouped-dataset">GroupCombine on a Grouped DataSet</a></li>
+  <li><a href="#aggregate-on-grouped-tuple-dataset" id="markdown-toc-aggregate-on-grouped-tuple-dataset">Aggregate on Grouped Tuple DataSet</a></li>
+  <li><a href="#reduce-on-full-dataset" id="markdown-toc-reduce-on-full-dataset">Reduce on full DataSet</a></li>
+  <li><a href="#groupreduce-on-full-dataset" id="markdown-toc-groupreduce-on-full-dataset">GroupReduce on full DataSet</a></li>
+  <li><a href="#groupcombine-on-a-full-dataset" id="markdown-toc-groupcombine-on-a-full-dataset">GroupCombine on a full DataSet</a></li>
+  <li><a href="#aggregate-on-full-tuple-dataset" id="markdown-toc-aggregate-on-full-tuple-dataset">Aggregate on full Tuple DataSet</a></li>
+  <li><a href="#join" id="markdown-toc-join">Join</a></li>
+  <li><a href="#cross" id="markdown-toc-cross">Cross</a></li>
+  <li><a href="#cogroup" id="markdown-toc-cogroup">CoGroup</a></li>
+  <li><a href="#union" id="markdown-toc-union">Union</a></li>
+  <li><a href="#rebalance" id="markdown-toc-rebalance">Rebalance</a></li>
+  <li><a href="#hash-partition" id="markdown-toc-hash-partition">Hash-Partition</a></li>
+  <li><a href="#sort-partition" id="markdown-toc-sort-partition">Sort Partition</a></li>
+  <li><a href="#first-n" id="markdown-toc-first-n">First-n</a></li>
+</ul>
+
+<h3 id="map">Map</h3>
+
+<p>The Map transformation applies a user-defined map function on each element of a DataSet.
+It implements a one-to-one mapping, that is, exactly one element must be returned by
+the function.</p>
+
+<p>The following code transforms a DataSet of Integer pairs into a DataSet of Integers:</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="c1">// MapFunction that adds two integer values</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">IntAdder</span> <span class="kd">implements</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Integer</span> <span class="nf">map</span><span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">return</span> <span class="n">in</span><span class="o">.</span><span class="na">f0</span> <span class="o">+</span> <span class="n">in</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">intPairs</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">intSums</span> <span class="o">=</span> <span class="n">intPairs</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="nf">IntAdder</span><span class="o">());</span></code></pre></div>
+
+  </div>
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">intPairs</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
+<span class="k">val</span> <span class="n">intSums</span> <span class="k">=</span> <span class="n">intPairs</span><span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="n">pair</span> <span class="k">=&gt;</span> <span class="n">pair</span><span class="o">.</span><span class="n">_1</span> <span class="o">+</span> <span class="n">pair</span><span class="o">.</span><span class="n">_2</span> <span class="o">}</span></code></pre></div>
+
+  </div>
+  <div data-lang="python">
+
+    <div class="highlight"><pre><code class="language-python"> <span class="n">intSums</span> <span class="o">=</span> <span class="n">intPairs</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="nb">sum</span><span class="p">(</span><span class="n">x</span><span class="p">),</span> <span class="n">INT</span><span class="p">)</span></code></pre></div>
+
+  </div>
+</div>
+
+<h3 id="flatmap">FlatMap</h3>
+
+<p>The FlatMap transformation applies a user-defined flat-map function on each element of a DataSet.
+This variant of a map function can return arbitrary many result elements (including none) for each input element.</p>
+
+<p>The following code transforms a DataSet of text lines into a DataSet of words:</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="c1">// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">Tokenizer</span> <span class="kd">implements</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">token</span> <span class="o">:</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\W&quot;</span><span class="o">))</span> <span class="o">{</span>
+      <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">token</span><span class="o">);</span>
+    <span class="o">}</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">textLines</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="n">textLines</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">());</span></code></pre></div>
+
+  </div>
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">textLines</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span>
+<span class="k">val</span> <span class="n">words</span> <span class="k">=</span> <span class="n">textLines</span><span class="o">.</span><span class="n">flatMap</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">)</span> <span class="o">}</span></code></pre></div>
+
+  </div>
+  <div data-lang="python">
+
+    <div class="highlight"><pre><code class="language-python"> <span class="n">words</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">flat_map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span><span class="n">c</span><span class="p">:</span> <span class="p">[</span><span class="n">line</span><span class="o">.</span><span class="n">split</span><span class="p">()</span> <span class="k">for</span> <span class="n">line</span> <span class="ow">in</span> <span class="n">x</span><span class="p">],</span> <span class="n">STRING</span><span class="p">)</span></code></pre></div>
+
+  </div>
+</div>
+
+<h3 id="mappartition">MapPartition</h3>
+
+<p>MapPartition transforms a parallel partition in a single function call. The map-partition function
+gets the partition as Iterable and can produce an arbitrary number of result values. The number of elements in each partition depends on the degree-of-parallelism
+and previous operations.</p>
+
+<p>The following code transforms a DataSet of text lines into a DataSet of counts per partition:</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">PartitionCounter</span> <span class="kd">implements</span> <span class="n">MapPartitionFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="o">{</span>
+
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapPartition</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+    <span class="kt">long</span> <span class="n">c</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+    <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">s</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">c</span><span class="o">++;</span>
+    <span class="o">}</span>
+    <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">c</span><span class="o">);</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">textLines</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">counts</span> <span class="o">=</span> <span class="n">textLines</span><span class="o">.</span><span class="na">mapPartition</span><span class="o">(</span><span class="k">new</span> <span class="nf">PartitionCounter</span><span class="o">());</span></code></pre></div>
+
+  </div>
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">textLines</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span>
+<span class="c1">// Some is required because the return value must be a Collection.</span>
+<span class="c1">// There is an implicit conversion from Option to a Collection.</span>
+<span class="k">val</span> <span class="n">counts</span> <span class="k">=</span> <span class="n">texLines</span><span class="o">.</span><span class="n">mapPartition</span> <span class="o">{</span> <span class="n">in</span> <span class="k">=&gt;</span> <span class="nc">Some</span><span class="o">(</span><span class="n">in</span><span class="o">.</span><span class="n">size</span><span class="o">)</span> <span class="o">}</span></code></pre></div>
+
+  </div>
+  <div data-lang="python">
+
+    <div class="highlight"><pre><code class="language-python"> <span class="n">counts</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">map_partition</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">,</span><span class="n">c</span><span class="p">:</span> <span class="p">[</span><span class="nb">sum</span><span class="p">(</span><span class="mi">1</span> <span class="k">for</span> <span class="n">_</span> <span class="ow">in</span> <span class="n">x</span><span class="p">)],</span> <span class="n">INT</span><span class="p">)</span></code></pre></div>
+
+  </div>
+</div>
+
+<h3 id="filter">Filter</h3>
+
+<p>The Filter transformation applies a user-defined filter function on each element of a DataSet and retains only those elements for which the function returns <code>true</code>.</p>
+
+<p>The following code removes all Integers smaller than zero from a DataSet:</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="c1">// FilterFunction that filters out all Integers smaller than zero.</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">NaturalNumberFilter</span> <span class="kd">implements</span> <span class="n">FilterFunction</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">Integer</span> <span class="n">number</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">return</span> <span class="n">number</span> <span class="o">&gt;=</span> <span class="mi">0</span><span class="o">;</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">intNumbers</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">naturalNumbers</span> <span class="o">=</span> <span class="n">intNumbers</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="k">new</span> <span class="nf">NaturalNumberFilter</span><span class="o">());</span></code></pre></div>
+
+  </div>
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">intNumbers</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span>
+<span class="k">val</span> <span class="n">naturalNumbers</span> <span class="k">=</span> <span class="n">intNumbers</span><span class="o">.</span><span class="n">filter</span> <span class="o">{</span> <span class="k">_</span> <span class="o">&gt;</span> <span class="mi">0</span> <span class="o">}</span></code></pre></div>
+
+  </div>
+  <div data-lang="python">
+
+    <div class="highlight"><pre><code class="language-python"> <span class="n">naturalNumbers</span> <span class="o">=</span> <span class="n">intNumbers</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="n">x</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="p">)</span></code></pre></div>
+
+  </div>
+</div>
+
+<p><strong>IMPORTANT:</strong> The system assumes that the function does not modify the elements on which the predicate is applied. Violating this assumption
+can lead to incorrect results.</p>
+
+<h3 id="project-tuple-datasets-only-javapython-api-only">Project (Tuple DataSets only) (Java/Python API Only)</h3>
+
+<p>The Project transformation removes or moves Tuple fields of a Tuple DataSet.
+The <code>project(int...)</code> method selects Tuple fields that should be retained by their index and defines their order in the output Tuple.</p>
+
+<p>Projections do not require the definition of a user function.</p>
+
+<p>The following code shows different ways to apply a Project transformation on a DataSet:</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">in</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="c1">// converts Tuple3&lt;Integer, Double, String&gt; into Tuple2&lt;String, Integer&gt;</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">out</span> <span class="o">=</span> <span class="n">in</span><span class="o">.</span><span class="na">project</span><span class="o">(</span><span class="mi">2</span><span class="o">,</span><span class="mi">0</span><span class="o">);</span></code></pre></div>
+
+    <h4 id="projection-with-type-hint">Projection with Type Hint</h4>
+
+    <p>Note that the Java compiler cannot infer the return type of <code>project</code> operator. This can cause a problem if you call another operator on a result of <code>project</code> operator such as:</p>
+
+    <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple5</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">ds</span> <span class="o">=</span> <span class="o">....</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple1</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">ds2</span> <span class="o">=</span> <span class="n">ds</span><span class="o">.</span><span class="na">project</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">distinct</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span></code></pre></div>
+
+    <p>This problem can be overcome by hinting the return type of <code>project</code> operator like this:</p>
+
+    <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple1</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">ds2</span> <span class="o">=</span> <span class="n">ds</span><span class="o">.&lt;</span><span class="n">Tuple1</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span><span class="n">project</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="na">distinct</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span></code></pre></div>
+
+    <h3 id="transformations-on-grouped-dataset">Transformations on Grouped DataSet</h3>
+
+    <p>The reduce operations can operate on grouped data sets. Specifying the key to
+be used for grouping can be done in many ways:</p>
+
+    <ul>
+      <li>key expressions</li>
+      <li>a key-selector function</li>
+      <li>one or more field position keys (Tuple DataSet only)</li>
+      <li>Case Class fields (Case Classes only)</li>
+    </ul>
+
+    <p>Please look at the reduce examples to see how the grouping keys are specified.</p>
+
+  </div>
+  <div data-lang="python">
+
+    <div class="highlight"><pre><code class="language-python"><span class="n">out</span> <span class="o">=</span> <span class="ow">in</span><span class="o">.</span><span class="n">project</span><span class="p">(</span><span class="mi">2</span><span class="p">,</span><span class="mi">0</span><span class="p">);</span></code></pre></div>
+
+    <h3 id="transformations-on-grouped-dataset-1">Transformations on Grouped DataSet</h3>
+
+    <p>The reduce operations can operate on grouped data sets. Specifying the key to
+be used for grouping can be done using one or more field position keys (Tuple DataSet only).</p>
+
+    <p>Please look at the reduce examples to see how the grouping keys are specified.</p>
+
+  </div>
+</div>
+
+<h3 id="reduce-on-grouped-dataset">Reduce on Grouped DataSet</h3>
+
+<p>A Reduce transformation that is applied on a grouped DataSet reduces each group to a single
+element using a user-defined reduce function.
+For each group of input elements, a reduce function successively combines pairs of elements into one
+element until only a single element for each group remains.</p>
+
+<h4 id="reduce-on-dataset-grouped-by-keyselector-function">Reduce on DataSet Grouped by KeySelector Function</h4>
+
+<p>A key-selector function extracts a key value from each element of a DataSet. The extracted key
+value is used to group the DataSet.
+The following code shows how to group a POJO DataSet using a key-selector function and to reduce it
+with a reduce function.</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="c1">// some ordinary POJO</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">WC</span> <span class="o">{</span>
+  <span class="kd">public</span> <span class="n">String</span> <span class="n">word</span><span class="o">;</span>
+  <span class="kd">public</span> <span class="kt">int</span> <span class="n">count</span><span class="o">;</span>
+  <span class="c1">// [...]</span>
+<span class="o">}</span>
+
+<span class="c1">// ReduceFunction that sums Integer attributes of a POJO</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordCounter</span> <span class="kd">implements</span> <span class="n">ReduceFunction</span><span class="o">&lt;</span><span class="n">WC</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">WC</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">WC</span> <span class="n">in1</span><span class="o">,</span> <span class="n">WC</span> <span class="n">in2</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">return</span> <span class="k">new</span> <span class="nf">WC</span><span class="o">(</span><span class="n">in1</span><span class="o">.</span><span class="na">word</span><span class="o">,</span> <span class="n">in1</span><span class="o">.</span><span class="na">count</span> <span class="o">+</span> <span class="n">in2</span><span class="o">.</span><span class="na">count</span><span class="o">);</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">WC</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">WC</span><span class="o">&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">words</span>
+                         <span class="c1">// DataSet grouping on field &quot;word&quot;</span>
+                         <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;word&quot;</span><span class="o">)</span>
+                         <span class="c1">// apply ReduceFunction on grouped DataSet</span>
+                         <span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nf">WordCounter</span><span class="o">());</span></code></pre></div>
+
+  </div>
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala"><span class="c1">// some ordinary POJO</span>
+<span class="k">class</span> <span class="nc">WC</span><span class="o">(</span><span class="k">val</span> <span class="n">word</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="k">val</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> <span class="o">{</span>
+  <span class="k">def</span> <span class="k">this</span><span class="o">()</span> <span class="o">{</span>
+    <span class="k">this</span><span class="o">(</span><span class="kc">null</span><span class="o">,</span> <span class="o">-</span><span class="mi">1</span><span class="o">)</span>
+  <span class="o">}</span>
+  <span class="c1">// [...]</span>
+<span class="o">}</span>
+
+<span class="k">val</span> <span class="n">words</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">WC</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span>
+<span class="k">val</span> <span class="n">wordCounts</span> <span class="k">=</span> <span class="n">words</span><span class="o">.</span><span class="n">groupBy</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">word</span> <span class="o">}</span> <span class="n">reduce</span> <span class="o">{</span>
+  <span class="o">(</span><span class="n">w1</span><span class="o">,</span> <span class="n">w2</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="k">new</span> <span class="nc">WC</span><span class="o">(</span><span class="n">w1</span><span class="o">.</span><span class="n">word</span><span class="o">,</span> <span class="n">w1</span><span class="o">.</span><span class="n">count</span> <span class="o">+</span> <span class="n">w2</span><span class="o">.</span><span class="n">count</span><span class="o">)</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+  <div data-lang="python">
+
+    <div class="highlight"><pre><code class="language-python"><span class="n">Not</span> <span class="n">supported</span><span class="o">.</span></code></pre></div>
+  </div>
+</div>
+
+<h4 id="reduce-on-dataset-grouped-by-field-position-keys-tuple-datasets-only">Reduce on DataSet Grouped by Field Position Keys (Tuple DataSets only)</h4>
+
+<p>Field position keys specify one or more fields of a Tuple DataSet that are used as grouping keys.
+The following code shows how to use field position keys and apply a reduce function</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">tuples</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">reducedTuples</span> <span class="o">=</span>
+                                         <span class="n">tuples</span>
+                                         <span class="c1">// group DataSet on first and second field of Tuple</span>
+                                         <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span><span class="mi">1</span><span class="o">)</span>
+                                         <span class="c1">// apply ReduceFunction on grouped DataSet</span>
+                                         <span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nf">MyTupleReducer</span><span class="o">());</span></code></pre></div>
+
+  </div>
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">tuples</span> <span class="k">=</span> <span class="nc">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
+<span class="c1">// group on the first and second Tuple field</span>
+<span class="k">val</span> <span class="n">reducedTuples</span> <span class="k">=</span> <span class="n">tuples</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="mi">1</span><span class="o">).</span><span class="n">reduce</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span></code></pre></div>
+
+    <h4 id="reduce-on-dataset-grouped-by-case-class-fields">Reduce on DataSet grouped by Case Class Fields</h4>
+
+    <p>When using Case Classes you can also specify the grouping key using the names of the fields:</p>
+
+    <div class="highlight"><pre><code class="language-scala"><span class="k">case</span> <span class="k">class</span> <span class="nc">MyClass</span><span class="o">(</span><span class="k">val</span> <span class="n">a</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">b</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">c</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span>
+<span class="k">val</span> <span class="n">tuples</span> <span class="k">=</span> <span class="nc">DataSet</span><span class="o">[</span><span class="kt">MyClass</span><span class="o">]</span> <span class="k">=</span> <span class="c1">// [...]</span>
+<span class="c1">// group on the first and second field</span>
+<span class="k">val</span> <span class="n">reducedTuples</span> <span class="k">=</span> <span class="n">tuples</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;a&quot;</span><span class="o">,</span> <span class="s">&quot;b&quot;</span><span class="o">).</span><span class="n">reduce</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span></code></pre></div>
+
+  </div>
+  <div data-lang="python">
+
+    <div class="highlight"><pre><code class="language-python"> <span class="n">reducedTuples</span> <span class="o">=</span> <span class="n">tuples</span><span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span><span class="o">.</span><span class="n">reduce</span><span class="p">(</span> <span class="o">...</span> <span class="p">)</span></code></pre></div>
+
+  </div>
+</div>
+
+<h3 id="groupreduce-on-grouped-dataset">GroupReduce on Grouped DataSet</h3>
+
+<p>A GroupReduce transformation that is applied on a grouped DataSet calls a user-defined
+group-reduce function for each group. The difference
+between this and <em>Reduce</em> is that the user defined function gets the whole group at once.
+The function is invoked with an Iterable over all elements of a group and can return an arbitrary
+number of result elements.</p>
+
+<h4 id="groupreduce-on-dataset-grouped-by-field-position-keys-tuple-datasets-only">GroupReduce on DataSet Grouped by Field Position Keys (Tuple DataSets only)</h4>
+
+<p>The following code shows how duplicate strings can be removed from a DataSet grouped by Integer.</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">DistinctReduce</span>
+         <span class="kd">implements</span> <span class="n">GroupReduceFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">in</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+
+    <span class="n">Set</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">uniqStrings</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;();</span>
+    <span class="n">Integer</span> <span class="n">key</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
+
+    <span class="c1">// add all strings of the group to the set</span>
+    <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">t</span> <span class="o">:</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">key</span> <span class="o">=</span> <span class="n">t</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span>
+      <span class="n">uniqStrings</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">t</span><span class="o">.</span><span class="na">f1</span><span class="o">);</span>
+    <span class="o">}</span>
+
+    <span class="c1">// emit all unique strings.</span>
+    <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">s</span> <span class="o">:</span> <span class="n">uniqStrings</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;(</span><span class="n">key</span><span class="o">,</span> <span class="n">s</span><span class="o">));</span>
+    <span class="o">}</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span>
+                           <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>            <span class="c1">// group DataSet by the first tuple field</span>
+                           <span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nf">DistinctReduce</span><span class="o">());</span>  <span class="c1">// apply GroupReduceFunction</span></code></pre></div>
+
+  </div>
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
+<span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">reduceGroup</span> <span class="o">{</span>
+      <span class="o">(</span><span class="n">in</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)])</span> <span class="k">=&gt;</span>
+        <span class="n">in</span><span class="o">.</span><span class="n">toSet</span> <span class="n">foreach</span> <span class="o">(</span><span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">)</span>
+    <span class="o">}</span></code></pre></div>
+
+    <h4 id="groupreduce-on-dataset-grouped-by-case-class-fields">GroupReduce on DataSet Grouped by Case Class Fields</h4>
+
+    <p>Works analogous to grouping by Case Class fields in <em>Reduce</em> transformations.</p>
+
+  </div>
+  <div data-lang="python">
+
+    <div class="highlight"><pre><code class="language-python"> <span class="k">class</span> <span class="nc">DistinctReduce</span><span class="p">(</span><span class="n">GroupReduceFunction</span><span class="p">):</span>
+   <span class="k">def</span> <span class="nf">reduce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span>
+     <span class="n">dic</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">()</span>
+     <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span>
+       <span class="n">dic</span><span class="p">[</span><span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]]</span> <span class="o">=</span> <span class="mi">1</span>
+     <span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">dic</span><span class="o">.</span><span class="n">keys</span><span class="p">():</span>
+       <span class="n">collector</span><span class="o">.</span><span class="n">collect</span><span class="p">(</span><span class="n">key</span><span class="p">)</span>
+
+ <span class="n">output</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span><span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">DistinctReduce</span><span class="p">(),</span> <span class="n">STRING</span><span class="p">)</span></code></pre></div>
+
+  </div>
+</div>
+
+<h4 id="groupreduce-on-dataset-grouped-by-keyselector-function">GroupReduce on DataSet Grouped by KeySelector Function</h4>
+
+<p>Works analogous to key-selector functions in <em>Reduce</em> transformations.</p>
+
+<h4 id="groupreduce-on-sorted-groups">GroupReduce on sorted groups</h4>
+
+<p>A group-reduce function accesses the elements of a group using an Iterable. Optionally, the Iterable can hand out the elements of a group in a specified order. In many cases this can help to reduce the complexity of a user-defined
+group-reduce function and improve its efficiency.</p>
+
+<p>The following code shows another example how to remove duplicate Strings in a DataSet grouped by an Integer and sorted by String.</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="c1">// GroupReduceFunction that removes consecutive identical elements</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">DistinctReduce</span>
+         <span class="kd">implements</span> <span class="n">GroupReduceFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">in</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+    <span class="n">Integer</span> <span class="n">key</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
+    <span class="n">String</span> <span class="n">comp</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
+
+    <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">t</span> <span class="o">:</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">key</span> <span class="o">=</span> <span class="n">t</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span>
+      <span class="n">String</span> <span class="n">next</span> <span class="o">=</span> <span class="n">t</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
+
+      <span class="c1">// check if strings are different</span>
+      <span class="k">if</span> <span class="o">(</span><span class="n">com</span> <span class="o">==</span> <span class="kc">null</span> <span class="o">||</span> <span class="o">!</span><span class="n">next</span><span class="o">.</span><span class="na">equals</span><span class="o">(</span><span class="n">comp</span><span class="o">))</span> <span class="o">{</span>
+        <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;(</span><span class="n">key</span><span class="o">,</span> <span class="n">next</span><span class="o">));</span>
+        <span class="n">comp</span> <span class="o">=</span> <span class="n">next</span><span class="o">;</span>
+      <span class="o">}</span>
+    <span class="o">}</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span>
+                         <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>                         <span class="c1">// group DataSet by first field</span>
+                         <span class="o">.</span><span class="na">sortGroup</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">Order</span><span class="o">.</span><span class="na">ASCENDING</span><span class="o">)</span>      <span class="c1">// sort groups on second tuple field</span>
+                         <span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nf">DistinctReduce</span><span class="o">());</span></code></pre></div>
+
+  </div>
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
+<span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="n">sortGroup</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="nc">Order</span><span class="o">.</span><span class="nc">ASCENDING</span><span class="o">).</span><span class="n">reduceGroup</span> <span class="o">{</span>
+      <span class="o">(</span><span class="n">in</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span><span class="o">)])</span> <span class="k">=&gt;</span>
+        <span class="k">var</span> <span class="n">prev</span><span class="k">:</span> <span class="o">(</span><span class="kt">Int</span><span class="o">,</span> <span class="kt">String</span><span class="o">)</span> <span class="k">=</span> <span class="kc">null</span>
+        <span class="k">for</span> <span class="o">(</span><span class="n">t</span> <span class="k">&lt;-</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span>
+          <span class="k">if</span> <span class="o">(</span><span class="n">prev</span> <span class="o">==</span> <span class="kc">null</span> <span class="o">||</span> <span class="n">prev</span> <span class="o">!=</span> <span class="n">t</span><span class="o">)</span>
+            <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">t</span><span class="o">)</span>
+        <span class="o">}</span>
+    <span class="o">}</span></code></pre></div>
+
+  </div>
+  <div data-lang="python">
+
+    <div class="highlight"><pre><code class="language-python"> <span class="k">class</span> <span class="nc">DistinctReduce</span><span class="p">(</span><span class="n">GroupReduceFunction</span><span class="p">):</span>
+   <span class="k">def</span> <span class="nf">reduce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span>
+     <span class="n">dic</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">()</span>
+     <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span>
+       <span class="n">dic</span><span class="p">[</span><span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]]</span> <span class="o">=</span> <span class="mi">1</span>
+     <span class="k">for</span> <span class="n">key</span> <span class="ow">in</span> <span class="n">dic</span><span class="o">.</span><span class="n">keys</span><span class="p">():</span>
+       <span class="n">collector</span><span class="o">.</span><span class="n">collect</span><span class="p">(</span><span class="n">key</span><span class="p">)</span>
+
+ <span class="n">output</span> <span class="o">=</span> <span class="n">data</span><span class="o">.</span><span class="n">group_by</span><span class="p">(</span><span class="mi">0</span><span class="p">)</span><span class="o">.</span><span class="n">sort_group</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="n">Order</span><span class="o">.</span><span class="n">ASCENDING</span><span class="p">)</span><span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">DistinctReduce</span><span class="p">(),</span> <span class="n">STRING</span><span class="p">)</span></code></pre></div>
+
+  </div>
+</div>
+
+<p><strong>Note:</strong> A GroupSort often comes for free if the grouping is established using a sort-based execution strategy of an operator before the reduce operation.</p>
+
+<h4 id="combinable-groupreducefunctions">Combinable GroupReduceFunctions</h4>
+
+<p>In contrast to a reduce function, a group-reduce function is not
+necessarily combinable. In order to make a group-reduce function
+combinable, you need to use the <code>RichGroupReduceFunction</code> variant,
+implement (override) the <code>combine()</code> method, and annotate the
+<code>RichGroupReduceFunction</code> with the <code>@Combinable</code> annotation as shown here:</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="c1">// Combinable GroupReduceFunction that computes two sums.</span>
+<span class="c1">// Note that we use the RichGroupReduceFunction because it defines the combine method</span>
+<span class="nd">@Combinable</span>
+<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCombinableGroupReducer</span>
+         <span class="kd">extends</span> <span class="n">RichGroupReduceFunction</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;,</span>
+                                     <span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">in</span><span class="o">,</span>
+                     <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+
+    <span class="n">String</span> <span class="n">key</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span>
+    <span class="kt">int</span> <span class="n">intSum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+    <span class="kt">double</span> <span class="n">doubleSum</span> <span class="o">=</span> <span class="mf">0.0</span><span class="o">;</span>
+
+    <span class="k">for</span> <span class="o">(</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="n">curr</span> <span class="o">:</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">key</span> <span class="o">=</span> <span class="n">curr</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span>
+      <span class="n">intSum</span> <span class="o">+=</span> <span class="n">curr</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
+      <span class="n">doubleSum</span> <span class="o">+=</span> <span class="n">curr</span><span class="o">.</span><span class="na">f2</span><span class="o">;</span>
+    <span class="o">}</span>
+    <span class="c1">// emit a tuple with both sums</span>
+    <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;(</span><span class="n">key</span><span class="o">,</span> <span class="n">intSum</span><span class="o">,</span> <span class="n">doubleSum</span><span class="o">));</span>
+  <span class="o">}</span>
+
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">combine</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">in</span><span class="o">,</span>
+                      <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">{</span>
+    <span class="c1">// in some cases combine() calls can simply be forwarded to reduce().</span>
+    <span class="k">this</span><span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="n">in</span><span class="o">,</span> <span class="n">out</span><span class="o">);</span>
+  <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala"><span class="c1">// Combinable GroupReduceFunction that computes two sums.</span>
+<span class="c1">// Note that we use the RichGroupReduceFunction because it defines the combine method</span>
+<span class="nd">@Combinable</span>
+<span class="k">class</span> <span class="nc">MyCombinableGroupReducer</span>
+  <span class="k">extends</span> <span class="nc">RichGroupReduceFunction</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)</span>, <span class="o">(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="o">{}</span>
+
+  <span class="k">def</span> <span class="n">reduce</span><span class="o">(</span>
+      <span class="n">in</span><span class="k">:</span> <span class="kt">java.lang.Iterable</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)],</span>
+      <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)])</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
+
+    <span class="k">val</span> <span class="n">key</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="kc">null</span>
+    <span class="k">val</span> <span class="n">intSum</span> <span class="k">=</span> <span class="mi">0</span>
+    <span class="k">val</span> <span class="n">doubleSum</span> <span class="k">=</span> <span class="mf">0.0</span>
+
+    <span class="k">for</span> <span class="o">(</span><span class="n">curr</span> <span class="k">&lt;-</span> <span class="n">in</span><span class="o">)</span> <span class="o">{</span>
+      <span class="n">key</span> <span class="k">=</span> <span class="n">curr</span><span class="o">.</span><span class="n">_1</span>
+      <span class="n">intSum</span> <span class="o">+=</span> <span class="n">curr</span><span class="o">.</span><span class="n">_2</span>
+      <span class="n">doubleSum</span> <span class="o">+=</span> <span class="n">curr</span><span class="o">.</span><span class="n">_3</span>
+    <span class="o">}</span>
+    <span class="c1">// emit a tuple with both sums</span>
+    <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">intSum</span><span class="o">,</span> <span class="n">doubleSum</span><span class="o">);</span>
+  <span class="o">}</span>
+
+  <span class="k">def</span> <span class="n">combine</span><span class="o">(</span>
+      <span class="n">in</span><span class="k">:</span> <span class="kt">java.lang.Iterable</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)],</span>
+      <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span>, <span class="kt">Double</span><span class="o">)])</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
+    <span class="c1">// in some cases combine() calls can simply be forwarded to reduce().</span>
+    <span class="k">this</span><span class="o">.</span><span class="n">reduce</span><span class="o">(</span><span class="n">in</span><span class="o">,</span> <span class="n">out</span><span class="o">)</span>
+  <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+  <div data-lang="python">
+
+    <div class="highlight"><pre><code class="language-python"> <span class="k">class</span> <span class="nc">GroupReduce</span><span class="p">(</span><span class="n">GroupReduceFunction</span><span class="p">):</span>
+   <span class="k">def</span> <span class="nf">reduce</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span>
+     <span class="n">key</span><span class="p">,</span> <span class="n">int_sum</span><span class="p">,</span> <span class="n">float_sum</span> <span class="o">=</span> <span class="n">iterator</span><span class="o">.</span><span class="n">next</span><span class="p">()</span>
+     <span class="k">for</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">iterator</span><span class="p">:</span>
+       <span class="n">int_sum</span> <span class="o">+=</span> <span class="n">value</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
+       <span class="n">float_sum</span> <span class="o">+=</span> <span class="n">value</span><span class="p">[</span><span class="mi">2</span><span class="p">]</span>
+     <span class="n">collector</span><span class="o">.</span><span class="n">collect</span><span class="p">((</span><span class="n">key</span><span class="p">,</span> <span class="n">int_sum</span><span class="p">,</span> <span class="n">float_sum</span><span class="p">))</span>
+   <span class="c"># in some cases combine() calls can simply be forwarded to reduce().</span>
+   <span class="k">def</span> <span class="nf">combine</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">):</span>
+     <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">reduce</span><span class="p">(</span><span class="n">iterator</span><span class="p">,</span> <span class="n">collector</span><span class="p">)</span>
+
+<span class="n">data</span><span class="o">.</span><span class="n">reduce_group</span><span class="p">(</span><span class="n">GroupReduce</span><span class="p">(),</span> <span class="p">(</span><span class="n">STRING</span><span class="p">,</span> <span class="n">INT</span><span class="p">,</span> <span class="n">FLOAT</span><span class="p">),</span> <span class="n">combinable</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span></code></pre></div>
+
+  </div>
+</div>
+
+<h3 id="groupcombine-on-a-grouped-dataset">GroupCombine on a Grouped DataSet</h3>
+
+<p>The GroupCombine transformation is the generalized form of the combine step in
+the Combinable GroupReduceFunction. It is generalized in the sense that it
+allows combining of input type <code>I</code> to an arbitrary output type <code>O</code>. In contrast,
+the combine step in the GroupReduce only allows combining from input type <code>I</code> to
+output type <code>I</code>. This is because the reduce step in the GroupReduceFunction
+expects input type <code>I</code>.</p>
+
+<p>In some applications, it is desirable to combine a DataSet into an intermediate
+format before performing additional transformations (e.g. to reduce data
+size). This can be achieved with a ComineGroup transformation with very little
+costs.</p>
+
+<p><strong>Note:</strong> The GroupCombine on a Grouped DataSet is performed in memory with a
+  greedy strategy which may not process all data at once but in multiple
+  steps. It is also performed on the individual partitions without a data
+  exchange like in a GroupReduce transformation. This may lead to partial
+  results.</p>
+
+<p>The following example demonstrates the use of a CombineGroup transformation for
+an alternative WordCount implementation. In the implementation,</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="o">[..]</span> <span class="c1">// The words received as input</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedInput</span> <span class="o">=</span> <span class="n">input</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> <span class="c1">// group identical words</span>
+
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">combinedWords</span> <span class="o">=</span> <span class="n">groupedInput</span><span class="o">.</span><span class="na">combineGroup</span><span class="o">(</span><span class="k">new</span> <span class="n">GroupCombineFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;()</span> <span class="o">{</span>
+
+    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">combine</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;)</span> <span class="o">{</span> <span class="c1">// combine</span>
+        <span class="kt">int</span> <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+        <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span>
+            <span class="n">count</span><span class="o">++;</span>
+        <span class="o">}</span>
+        <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tuple2</span><span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">count</span><span class="o">));</span>
+    <span class="o">}</span>
+<span class="o">});</span>
+
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">groupedCombinedWords</span> <span class="o">=</span> <span class="n">combinedWords</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span> <span class="c1">// group by words again</span>
+
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">combinedWords</span><span class="o">.</span><span class="na">reduceGroup</span><span class="o">(</span><span class="k">new</span> <span class="nf">GroupReduceFunction</span><span class="o">()</span> <span class="o">{</span> <span class="c1">// group reduce with full data exchange</span>
+
+    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">reduce</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;)</span> <span class="o">{</span>
+        <span class="kt">int</span> <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+        <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">word</span> <span class="o">:</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span>
+            <span class="n">count</span><span class="o">++;</span>
+        <span class="o">}</span>
+        <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tuple2</span><span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">count</span><span class="o">));</span>
+    <span class="o">}</span>
+<span class="o">});</span></code></pre></div>
+
+  </div>
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">[</span><span class="kt">..</span><span class="o">]</span> <span class="c1">// The words received as input</span>
+<span class="k">val</span> <span class="n">groupedInput</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
+
+<span class="k">val</span> <span class="n">combinedWords</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="n">groupedInput</span><span class="o">.</span><span class="n">combineGroup</span> <span class="o">{</span>
+    <span class="o">(</span><span class="n">words</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)])</span> <span class="k">=&gt;</span>
+        <span class="k">var</span> <span class="n">count</span> <span class="k">=</span> <span class="mi">0</span>
+        <span class="k">for</span> <span class="o">(</span><span class="n">word</span> <span class="k">&lt;-</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span>
+            <span class="n">count</span><span class="o">++</span>
+        <span class="o">}</span>
+        <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">count</span><span class="o">)</span>
+<span class="o">}</span>
+
+<span class="k">val</span> <span class="n">groupedCombinedWords</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="n">combinedWords</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">0</span><span class="o">)</span>
+
+<span class="k">val</span> <span class="n">output</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="n">groupedInput</span><span class="o">.</span><span class="n">reduceGroup</span> <span class="o">{</span>
+    <span class="o">(</span><span class="n">words</span><span class="o">,</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)])</span> <span class="k">=&gt;</span>
+        <span class="k">var</span> <span class="n">count</span> <span class="k">=</span> <span class="mi">0</span>
+        <span class="k">for</span> <span class="o">((</span><span class="n">word</span><span class="o">,</span> <span class="nc">Int</span><span class="o">)</span> <span class="k">&lt;-</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span>
+            <span class="n">count</span><span class="o">++</span>
+        <span class="o">}</span>
+        <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">count</span><span class="o">)</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+</div>
+
+<p>The above alternative WordCount implementation demonstrates how the GroupCombine
+combines words before performing the GroupReduce transformation. The above
+example is just a proof of concept. Note, how the combine step changes the type
+of the DataSet which would normally required an additional Map transformation
+before executing the GroupReduce.</p>
+
+<h3 id="aggregate-on-grouped-tuple-dataset">Aggregate on Grouped Tuple DataSet</h3>
+
+<p>There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:</p>
+
+<ul>
+  <li>Sum,</li>
+  <li>Min, and</li>
+  <li>Max.</li>
+</ul>
+
+<p>The Aggregate transformation can only be applied on a Tuple DataSet and supports only field positions keys for grouping.</p>
+
+<p>The following code shows how to apply an Aggregation transformation on a DataSet grouped by field position keys:</p>
+
+<div class="codetabs">
+  <div data-lang="java">
+
+    <div class="highlight"><pre><code class="language-java"><span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="c1">// [...]</span>
+<span class="n">DataSet</span><span class="o">&lt;</span><span class="n">Tuple3</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">input</span>
+                                   <span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>        <span class="c1">// group DataSet on second field</span>
+                                   <span class="o">.</span><span class="na">aggregate</span><span class="o">(</span><span class="n">SUM</span><span class="o">,</span> <span class="mi">0</span><span class="o">)</span> <span class="c1">// compute sum of the first field</span>
+                                   <span class="o">.</span><span class="na">and</span><span class="o">(</span><span class="n">MIN</span><span class="o">,</span> <span class="mi">2</span><span class="o">);</span>      <span class="c1">// compute minimum of the third field</span></code></pre></div>
+
+  </div>
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">String</span>, <span class="kt">Double</span><span class="o">)]</span> <span class="k">=</span> <span class="c1">// [...]</span>
+<span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="mi">1<

<TRUNCATED>