You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/07/04 13:00:20 UTC

svn commit: r1607832 [28/33] - in /incubator/flink: ./ _includes/ _layouts/ _plugins/ _posts/ blog/ css/ fonts/ img/ img/blog/ js/ site/ site/blog/ site/blog/page2/ site/css/ site/docs/ site/docs/0.6-SNAPSHOT/ site/docs/0.6-SNAPSHOT/css/ site/docs/0.6-...

Added: incubator/flink/site/docs/0.6-SNAPSHOT/scala_api_guide.html
URL: http://svn.apache.org/viewvc/incubator/flink/site/docs/0.6-SNAPSHOT/scala_api_guide.html?rev=1607832&view=auto
==============================================================================
--- incubator/flink/site/docs/0.6-SNAPSHOT/scala_api_guide.html (added)
+++ incubator/flink/site/docs/0.6-SNAPSHOT/scala_api_guide.html Fri Jul  4 11:00:15 2014
@@ -0,0 +1,1225 @@
+<!DOCTYPE html>
+<html lang="en">
+  <head>
+    <meta charset="utf-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1">
+    <title>Apache Flink (incubating): Scala API Programming Guide</title>
+    <link rel="stylesheet" href="/css/bootstrap.css">
+    <link rel="stylesheet" href="/css/bootstrap-lumen-custom.css">
+    <link href="//maxcdn.bootstrapcdn.com/font-awesome/4.1.0/css/font-awesome.min.css" rel="stylesheet">
+  </head>
+  <body>
+
+<nav class="navbar navbar-default navbar-static-top" role="navigation">
+  <div class="container">
+    <div class="navbar-header">
+      <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
+        <span class="sr-only">Toggle navigation</span>
+        <span class="icon-bar"></span>
+        <span class="icon-bar"></span>
+        <span class="icon-bar"></span>
+      </button>
+      <a class="navbar-brand" href="/index.html">Apache Flink</a>
+    </div>
+
+    <div class="collapse navbar-collapse" id="navbar-collapse-1">
+      <ul class="nav navbar-nav">
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Quickstart <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="/docs/0.6-SNAPSHOT/setup_quickstart.html">Setup Flink</a></li>
+            <li><a href="/docs/0.6-SNAPSHOT/java_api_quickstart.html">Java API</a></li>
+            <li><a href="/docs/0.6-SNAPSHOT/scala_api_quickstart.html">Scala API</a></li>
+          </ul>
+        </li>
+
+        <li>
+          <a href="/downloads.html" class="">Downloads</a>
+        </li>
+
+        <li>
+          <a href="/docs/0.6-SNAPSHOT/faq.html" class="">FAQ</a>
+        </li>
+
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Documentation <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="/docs/0.6-SNAPSHOT/">0.6-SNAPSHOT</a></li>
+            <li><a href="http://stratosphere-javadocs.github.io/">0.6-SNAPSHOT Javadocs</a></li>
+          </ul>
+        </li>
+
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Community <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="/community.html#mailing-lists">Mailing Lists</a></li>
+            <li><a href="/community.html#issues">Issues</a></li>
+            <li><a href="/community.html#team">Team</a></li>
+            <li class="divider"></li>
+            <li><a href="/how-to-contribute.html">How To Contribute</a></li>
+          </ul>
+        </li>
+
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">ASF <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="http://www.apache.org/">Apache Software Foundation</a>
+            <li><a href="http://www.apache.org/foundation/how-it-works.html">How it works</a>
+            <li><a href="http://www.apache.org/foundation/thanks.html">Thanks</a>
+            <li><a href="http://www.apache.org/foundation/sponsorship.html">Become a Sponsor</a>
+            <li><a href="http://incubator.apache.org/projects/flink.html">Incubation Status page</a></li>
+          </ul>
+        </li>
+
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Project <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <!--<li><a href="/project.html#history">History</a></li> -->
+            <li><a href="https://wiki.apache.org/incubator/StratosphereProposal">Incubator Proposal (external)</a></li>
+            <li><a href="http://www.apache.org/licenses/LICENSE-2.0">License</a></li>
+            <li><a href="https://github.com/apache/incubator-flink">Source Code</a></li>
+          </ul>
+        </li>
+
+        <li>
+          <a href="/blog/index.html" class="">Blog</a>
+        </li>
+
+      </ul>
+    </div>
+  </div>
+</nav>
+
+    <div class="container">
+
+<div class="row">
+  <div class="col-md-3">
+    <ul>
+      <li><a href="faq.html">FAQ</a></li>
+      <li>Quickstart
+        <ul>
+          <li><a href="setup_quickstart.html">Setup</a></li>
+          <li><a href="run_example_quickstart.html">Run Example</a></li>
+          <li><a href="java_api_quickstart.html">Java API</a></li>
+          <li><a href="scala_api_quickstart.html">Scala API</a></li>
+        </ul>
+      </li>
+
+      <li>Setup &amp; Configuration
+        <ul>
+          <li><a href="local_setup.html">Local Setup</a></li>
+          <li><a href="cluster_setup.html">Cluster Setup</a></li>
+          <li><a href="yarn_setup.html">YARN Setup</a></li>
+          <li><a href="config.html">Configuration</a></li>
+        </ul>
+      </li>
+
+      <li>Programming Guides
+        <ul>
+          <li><a href="java_api_guide.html">Java API</a></li>
+          <li><a href="scala_api_guide.html">Scala API</a></li>
+          <li><a href="hadoop_compatability.html">Hadoop Compatability</a></li>
+          <li><a href="iterations.html">Iterations</a></li>
+          <li><a href="spargel_guide.html">Spargel Graph API</a></li>
+        </ul>
+      </li>
+
+      <li>Examples
+        <ul>
+          <li><a href="java_api_examples.html">Java API</a></li>
+          <li><a href="scala_api_examples.html">Scala API</a></li>
+        </ul>
+      </li>
+
+      <li>Execution
+        <ul>
+          <li><a href="local_execution.html">Local/Debugging</a></li>
+          <li><a href="cluster_execution.html">Cluster</a></li>
+          <li><a href="cli.html">Command-Line Interface</a></li>
+          <li><a href="web_client.html">Web Interface</a></li>
+        </ul>
+      </li>
+
+      <li>Internals
+        <ul>
+          <li><a href="internal_overview.html">Overview</a></li>
+        </ul>
+      </li>
+    </ul>
+  </div>
+  <div class="col-md-9">
+      <h1>Scala API Programming Guide</h1>
+
+      <ul>
+<li>
+<a href="#scala-programming-guide">Scala Programming Guide</a>
+<ul>
+<li>
+<a href="#word-count-example">Word Count Example</a>
+</li>
+<li>
+<a href="#project-setup">Project Setup</a>
+</li>
+<li>
+<a href="#the-dataset-abstraction">The DataSet Abstraction</a>
+</li>
+<li>
+<a href="#data-types">Data Types</a>
+</li>
+<li>
+<a href="#creating-data-sources">Creating Data Sources</a>
+<ul>
+<li>
+<ul>
+<li>
+<a href="#textinputformat">TextInputFormat</a>
+</li>
+<li>
+<a href="#csvinputformat">CsvInputFormat</a>
+</li>
+<li>
+<a href="#delimitedinputformat">DelimitedInputFormat</a>
+</li>
+<li>
+<a href="#binaryinputformat">BinaryInputFormat</a>
+</li>
+<li>
+<a href="#binaryserializedinputformat">BinarySerializedInputFormat</a>
+</li>
+<li>
+<a href="#fixedlengthinputformat">FixedLengthInputFormat</a>
+</li>
+</ul>
+</li>
+</ul>
+</li>
+<li>
+<a href="#operations-on-dataset">Operations on DataSet</a>
+<ul>
+<li>
+<ul>
+<li>
+<a href="#basic-operator-templates">Basic Operator Templates</a>
+</li>
+<li>
+<a href="#field/key-selectors">Field/Key Selectors</a>
+</li>
+<li>
+<a href="#map-operation">Map Operation</a>
+</li>
+<li>
+<a href="#reduce-operation">Reduce Operation</a>
+</li>
+<li>
+<a href="#join-operation">Join Operation</a>
+</li>
+<li>
+<a href="#cogroup-operation">CoGroup Operation</a>
+</li>
+<li>
+<a href="#cross-operation">Cross Operation</a>
+</li>
+<li>
+<a href="#union">Union</a>
+</li>
+</ul>
+</li>
+</ul>
+</li>
+<li>
+<a href="#iterations">Iterations</a>
+<ul>
+<li>
+<ul>
+<li>
+<a href="#bulk-iteration">Bulk Iteration</a>
+</li>
+<li>
+<a href="#delta-iteration">Delta Iteration</a>
+</li>
+</ul>
+</li>
+</ul>
+</li>
+<li>
+<a href="#creating-data-sinks">Creating Data Sinks</a>
+<ul>
+<li>
+<ul>
+<li>
+<a href="#delimitedoutputformat">DelimitedOutputFormat</a>
+</li>
+<li>
+<a href="#csvoutputformat">CsvOutputFormat</a>
+</li>
+<li>
+<a href="#rawoutputformat">RawOutputFormat</a>
+</li>
+<li>
+<a href="#binaryoutputformat">BinaryOutputFormat</a>
+</li>
+<li>
+<a href="#binaryserializedoutputformat">BinarySerializedOutputFormat</a>
+</li>
+</ul>
+</li>
+</ul>
+</li>
+<li>
+<a href="#executing-jobs">Executing Jobs</a>
+</li>
+<li>
+<a href="#rich-functions">Rich Functions</a>
+</li>
+</ul>
+</li>
+</ul>
+
+
+      <p><section id="top"></p>
+
+<h1 id="scala-programming-guide">Scala Programming Guide</h1>
+
+<p>This guide explains how to develop Stratosphere programs with the Scala
+programming interface. </p>
+
+<p>Here we will look at the general structure of a Scala job. You will learn how to
+write data sources, data sinks, and operators to create data flows that can be
+executed using the Stratosphere system.</p>
+
+<p>Writing Scala jobs requires an understanding of Scala, there is excellent
+documentation available <a href="http://scala-lang.org/documentation/">here</a>. Most
+of the examples can be understood by someone with a good understanding
+of programming in general, though.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="intro-example"></p>
+
+<h2 id="word-count-example">Word Count Example</h2>
+
+<p>To start, let&#39;s look at a Word Count job implemented in Scala. This program is
+very simple but it will give you a basic idea of what a Scala job looks like.</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">eu.stratosphere.client.LocalExecutor</span>
+
+<span class="k">import</span> <span class="nn">eu.stratosphere.api.scala._</span>
+<span class="k">import</span> <span class="nn">eu.stratosphere.api.scala.operators._</span>
+
+<span class="k">object</span> <span class="nc">WordCount</span> <span class="o">{</span>
+  <span class="k">def</span> <span class="n">main</span><span class="o">(</span><span class="n">args</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="o">{</span>
+    <span class="k">val</span> <span class="n">input</span> <span class="k">=</span> <span class="nc">TextFile</span><span class="o">(</span><span class="n">textInput</span><span class="o">)</span>
+
+    <span class="k">val</span> <span class="n">words</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">flatMap</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">)</span> <span class="n">map</span> <span class="o">{</span> <span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> <span class="o">}</span> <span class="o">}</span>
+
+    <span class="k">val</span> <span class="n">counts</span> <span class="k">=</span> <span class="n">words</span><span class="o">.</span><span class="n">groupBy</span> <span class="o">{</span> <span class="k">case</span> <span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="k">_</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">word</span> <span class="o">}</span>
+      <span class="o">.</span><span class="n">reduce</span> <span class="o">{</span> <span class="o">(</span><span class="n">w1</span><span class="o">,</span> <span class="n">w2</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">w1</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">w1</span><span class="o">.</span><span class="n">_2</span> <span class="o">+</span> <span class="n">w2</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span> <span class="o">}</span>
+
+    <span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">counts</span><span class="o">.</span><span class="n">write</span><span class="o">(</span><span class="n">wordsOutput</span><span class="o">,</span> <span class="nc">CsvOutputFormat</span><span class="o">())</span>
+    <span class="k">val</span> <span class="n">plan</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">ScalaPlan</span><span class="o">(</span><span class="nc">Seq</span><span class="o">(</span><span class="n">output</span><span class="o">))</span>
+
+    <span class="nc">LocalExecutor</span><span class="o">.</span><span class="n">execute</span><span class="o">(</span><span class="n">plan</span><span class="o">)</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+</code></pre></div>
+<p>Same as any Stratosphere job a Scala job consists of one or several data
+sources, one or several data sinks and operators in between these that transform
+data. Together these parts are referred to as the data flow graph. It dictates
+the way data is passed when a job is executed.</p>
+
+<p>When using Scala in Stratosphere an important concept to grasp is that of the
+<code>DataSet</code>. <code>DataSet</code> is an abstract concept that represents actual data sets at
+runtime and which has operations that transform data to create a new transformed
+data set. In this example the <code>TextFile(&quot;/some/input&quot;)</code> call creates a
+<code>DataSet[String]</code> that represents the lines of text from the input. The
+<code>flatMap</code> operation that looks like a regular Scala flatMap is in fact an
+operation on <code>DataSet</code> that passes (at runtime) the data items through the
+provided anonymous function to transform them. The result of the <code>flatMap</code>
+operation is a new <code>DataSet</code> that represents the transformed data. On this other
+operations be performed. Another such operation are <code>groupBy</code> and <code>reduce</code>, but
+we will go into details of those later in this guide.</p>
+
+<p>The <code>write</code> operation of <code>DataSet</code> is used to create a data sink. You provide it
+with a path where the data is to be written to and an output format. This is
+enough for now but we will discuss data formats (for sources and sinks) later.</p>
+
+<p>To execute a data flow graph one or several sinks have to wrapped in a <code>Plan</code>
+which can then be executed on a cluster using <code>RemoteExecutor</code>. Here, the
+<code>LocalExecutor</code> is used to run the flow on the local computer. This is useful
+for debugging your job before running it on an actual cluster.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="intro-example"></p>
+
+<h2 id="project-setup">Project Setup</h2>
+
+<p>We will only cover maven here but the concepts should work equivalently with
+other build systems such as Gradle or sbt. When wanting to develop a Scala job
+all that is needed as dependency is is <code>stratosphere-scala</code> (and <code>stratosphere-clients</code>, if
+you want to execute your jobs). So all that needs to be done is to add the
+following lines to your POM.</p>
+<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt">&lt;dependencies&gt;</span>
+  <span class="nt">&lt;dependency&gt;</span>
+    <span class="nt">&lt;groupId&gt;</span>eu.stratosphere<span class="nt">&lt;/groupId&gt;</span>
+    <span class="nt">&lt;artifactId&gt;</span>stratosphere-scala<span class="nt">&lt;/artifactId&gt;</span>
+    <span class="nt">&lt;version&gt;</span>0.5.1<span class="nt">&lt;/version&gt;</span>
+  <span class="nt">&lt;/dependency&gt;</span>
+  <span class="nt">&lt;dependency&gt;</span>
+    <span class="nt">&lt;groupId&gt;</span>eu.stratosphere<span class="nt">&lt;/groupId&gt;</span>
+    <span class="nt">&lt;artifactId&gt;</span>stratosphere-clients<span class="nt">&lt;/artifactId&gt;</span>
+    <span class="nt">&lt;version&gt;</span>0.5.1<span class="nt">&lt;/version&gt;</span>
+  <span class="nt">&lt;/dependency&gt;</span>
+<span class="nt">&lt;/dependencies&gt;</span>
+</code></pre></div>
+<p>To quickly get started you can use the Stratosphere Scala quickstart available
+<a href="/quickstart/scala.html">here</a>. This will give you a
+completeMaven project with some working example code that you can use to explore
+the system or as basis for your own projects.</p>
+
+<p>These imports are normally enough for any project:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">eu.stratosphere.api.scala._</span>
+<span class="k">import</span> <span class="nn">eu.stratosphere.api.scala.operators._</span>
+
+<span class="k">import</span> <span class="nn">eu.stratosphere.client.LocalExecutor</span>
+<span class="k">import</span> <span class="nn">eu.stratosphere.client.RemoteExecutor</span>
+</code></pre></div>
+<p>The first two imports contain things like <code>DataSet</code>, <code>Plan</code>, data sources, data
+sinks, and the operations. The last two imports are required if you want to run
+a data flow on your local machine, respectively cluster.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="dataset"></p>
+
+<h2 id="the-dataset-abstraction">The DataSet Abstraction</h2>
+
+<p>As already alluded to in the introductory example you write Scala jobs by using
+operations on a <code>DataSet</code> to create new transformed <code>DataSet</code>. This concept is
+the core of the Stratosphere Scala API so it merits some more explanation. A
+<code>DataSet</code> can look and behave like a regular Scala collection in your code but
+it does not contain any actual data but only represents data. For example: when
+you use <code>TextFile()</code> you get back a <code>DataSource[String]</code> that represents each
+line of text in the input as a <code>String</code>. No data is actually loaded or available
+at this point. The set is only used to apply further operations which themselves
+are not executed until the data flow is executed. An operation on <code>DataSet</code>
+creates a new <code>DataSet</code> that represents the transformation and has a pointer to
+the <code>DataSet</code> that represents the data to be transformed. In this way a tree of
+data sets is created that contains both the specification of the flow of data as
+well as all the transformations. This graph can be wrapped in a <code>Plan</code> and
+executed.</p>
+
+<p>Working with the system is like working with lazy collections, where execution
+is postponed until the user submits the job.</p>
+
+<p><code>DataSet</code> has a generic parameter, this is the type of each data item or record
+that would be processed by further transformations. This is similar to how
+<code>List[A]</code> in Scala would behave. For example in:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span> <span class="k">=</span> <span class="o">...</span>
+<span class="k">val</span> <span class="n">mapped</span> <span class="k">=</span> <span class="n">input</span> <span class="n">map</span> <span class="o">{</span> <span class="n">a</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">a</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">a</span><span class="o">.</span><span class="n">_2</span> <span class="o">+</span> <span class="mi">1</span><span class="o">)}</span>
+</code></pre></div>
+<p>The anonymous function would retrieve in <code>a</code> tuples of type <code>(String, Int)</code>.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="datatypes"></p>
+
+<h2 id="data-types">Data Types</h2>
+
+<p>There are some restrictions regarding the data types that can be used in Scala
+jobs (basically the generic parameter of <code>DataSet</code>). The usable types are
+the primitive Scala types, case classes (which includes tuples), and custom
+data types.</p>
+
+<p>Custom data types must implement the interface
+<a href=https://github.com/apache/incubator-flink/blob/master//stratosphere-core/src/main/java/eu/stratosphere/types/Value.java>Value</a>.
+For custom data types that should also be used as a grouping key or join key
+the <a href=https://github.com/apache/incubator-flink/blob/master//stratosphere-core/src/main/java/eu/stratosphere/types/Key.java>Key</a>
+interface must be implemented.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="data-sources"></p>
+
+<h2 id="creating-data-sources">Creating Data Sources</h2>
+
+<p>To get an initial <code>DataSet</code> on which to perform operations to build a data flow
+graph the following construct is used:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span> <span class="k">=</span> <span class="nc">DataSource</span><span class="o">(</span><span class="s">&quot;&lt;file-path&gt;&quot;</span><span class="o">,</span> <span class="o">&lt;</span><span class="n">input</span><span class="o">-</span><span class="n">format</span><span class="o">&gt;)</span>
+</code></pre></div>
+<p>The value <code>input</code> is now a <code>DataSet</code> with the generic type depending on the
+input format.</p>
+
+<p>The file path can be on of either <code>file:///some/file</code> to acces files on the
+local machine or <code>hdfs://some/path</code> to read files from HDFS. The input
+format can be one of our builtin formats or a custom input format. The builtin
+formats are:</p>
+
+<ul>
+<li><a href="#text-input-format">TextInputFormat</a></li>
+<li><a href="#csv-input-format">CsvInputFormat</a></li>
+<li><a href="#delimited-input-format">DelimitedInputFormat</a></li>
+<li><a href="#binary-input-format">BinaryInputFormat</a></li>
+<li><a href="#binary-serialized-input-format">BinarySerializedInputFormat</a></li>
+<li><a href="#fixed-length-input-format">FixedLengthInputFormat</a></li>
+</ul>
+
+<p>We will now have a look at each of them and show how they are employed and in
+which situations.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="text-input-format"></p>
+
+<h4 id="textinputformat">TextInputFormat</h4>
+
+<p>This input format simply reads a text file line wise and creates a <code>String</code>
+for each line. It is used as:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">TextInputFormat</span><span class="o">()</span>
+</code></pre></div>
+<p>As you have already seen in the Word Count Example there is a shortcut for this.
+Instead of using a <code>DataSource</code> with <code>TextInputFormat</code> you can simply write:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span> <span class="k">=</span> <span class="nc">TextFile</span><span class="o">(</span><span class="s">&quot;&lt;file-path&gt;&quot;</span><span class="o">)</span>
+</code></pre></div>
+<p>The <code>input</code> would then be a <code>DataSet[String]</code>.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="csv-input-format"></p>
+
+<h4 id="csvinputformat">CsvInputFormat</h4>
+
+<p>This input format is mainly used to read Csv-Files, as the name suggests. Input
+files must be text files. You can specify the <code>String</code> that should be used
+as the separator between individual records (this would often be newline) and
+also the separator between fields of a record (this would often be a comma).
+The <code>CsvInputFormat</code> will automatically read the records and create
+Scala tuples or custom case class objects for you. The format can be used
+in one of the following ways:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">CsvInputFormat</span><span class="o">[</span><span class="kt">Out</span><span class="o">]()</span>
+<span class="nc">CsvInputFormat</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">recordDelim</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span>
+<span class="nc">CsvInputFormat</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">recordDelim</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">fieldDelim</span><span class="k">:</span> <span class="kt">Char</span><span class="o">)</span>
+
+<span class="nc">CsvInputFormat</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">fieldIndices</span><span class="k">:</span> <span class="kt">Seq</span><span class="o">[</span><span class="kt">Int</span><span class="o">])</span>
+<span class="nc">CsvInputFormat</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">fieldIndices</span><span class="k">:</span> <span class="kt">Seq</span><span class="o">[</span><span class="kt">Int</span><span class="o">],</span> <span class="n">recordDelim</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span>
+<span class="nc">CsvInputFormat</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">fieldIndices</span><span class="k">:</span> <span class="kt">Seq</span><span class="o">[</span><span class="kt">Int</span><span class="o">],</span> <span class="n">recordDelim</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">fieldDelim</span><span class="k">:</span> <span class="kt">Char</span><span class="o">)</span>
+</code></pre></div>
+<p>The default record delimiter is a newline, the default field delimiter is a
+comma. The type parameter <code>Out</code> must be a case class type, which also includes
+tuple types since they are internally case classes.</p>
+
+<p>Normally, all the fields of a record are read. If you want to explicitly
+specify which fields of the record should be read you can use one of the
+tree variants with a <code>fieldIndices</code> parameter. Here you give a list
+of the fields that should be read. Field indices start from zero.</p>
+
+<p>An example usage could look as follows:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span> <span class="k">=</span> <span class="nc">DataSource</span><span class="o">(</span><span class="s">&quot;file:///some/file&quot;</span><span class="o">,</span> <span class="nc">CsvInputFormat</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span>, <span class="kt">String</span><span class="o">)](</span><span class="nc">Seq</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">17</span><span class="o">,</span> <span class="mi">42</span><span class="o">),</span> <span class="s">&quot;\n&quot;</span><span class="o">,</span> <span class="sc">&#39;,&#39;</span><span class="o">))</span>
+</code></pre></div>
+<p>Here only the specified fields would be read and 3-tuples created for you.
+The type of input would be <code>DataSet[(Int, Int, String)]</code>.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="delimited-input-format"></p>
+
+<h4 id="delimitedinputformat">DelimitedInputFormat</h4>
+
+<p>This input format is meant for textual records that are separated by
+some delimiter. The delimiter could be a newline, for example. It is used like
+this:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">DelimitedInputFormat</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">parseFunction</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=&gt;</span> <span class="nc">Out</span><span class="o">,</span> <span class="n">delim</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="s">&quot;\n&quot;</span><span class="o">)</span>
+</code></pre></div>
+<p>The input files will be split on the supplied delimiter (or the default newline)
+and the supplied parse function must parse the textual representation in the
+<code>String</code> and return an object. The type of this object will then also be the
+type of the <code>DataSet</code> created by the <code>DataSource</code> operation.</p>
+
+<p>Just as with <code>BinaryInputFormat</code> the function can be an anonymous function, so
+you could have:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span> <span class="k">=</span> <span class="nc">DataSource</span><span class="o">(</span><span class="s">&quot;file:///some/file&quot;</span><span class="o">,</span> <span class="nc">BinaryInputFormat</span><span class="o">(</span> <span class="o">{</span> <span class="n">line</span> <span class="k">=&gt;</span>
+  <span class="n">line</span> <span class="k">match</span> <span class="o">{</span>
+    <span class="k">case</span> <span class="nc">EdgeInputPattern</span><span class="o">(</span><span class="n">from</span><span class="o">,</span> <span class="n">to</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Path</span><span class="o">(</span><span class="n">from</span><span class="o">.</span><span class="n">toInt</span><span class="o">,</span> <span class="n">to</span><span class="o">.</span><span class="n">toInt</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span>
+  <span class="o">}</span>
+<span class="o">}))</span>
+</code></pre></div>
+<p>In this example EdgeInputPattern is some regular expression used for parsing
+a line of text and <code>Path</code> is a custom case class that is used to represent
+the data. The type of input would in this case be <code>DataSet[Path]</code>.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="binary-input-format"></p>
+
+<h4 id="binaryinputformat">BinaryInputFormat</h4>
+
+<p>This input format is best used when you have a custom binary format that
+you store the data in. It is created using one of the following:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">BinaryInputFormat</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">readFunction</span><span class="k">:</span> <span class="kt">DataInput</span> <span class="o">=&gt;</span> <span class="nc">Out</span><span class="o">)</span>
+<span class="nc">BinaryInputFormat</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">readFunction</span><span class="k">:</span> <span class="kt">DataInput</span> <span class="o">=&gt;</span> <span class="nc">Out</span><span class="o">,</span> <span class="n">blocksize</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span>
+</code></pre></div>
+<p>So you have to provide a function that gets a
+<a href="http://docs.oracle.com/javase/7/docs/api/java/io/DataInput.html">java.io.DataInput</a>
+and returns the object that
+contains the data. The type of this object will then also be the type of the
+<code>DataSet</code> created by the <code>DataSource</code> operation.</p>
+
+<p>The provided function can also be an anonymous function, so you could
+have something like this:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span> <span class="k">=</span> <span class="nc">DataSource</span><span class="o">(</span><span class="s">&quot;file:///some/file&quot;</span><span class="o">,</span> <span class="nc">BinaryInputFormat</span><span class="o">(</span> <span class="o">{</span> <span class="n">input</span> <span class="k">=&gt;</span>
+  <span class="k">val</span> <span class="n">one</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">readInt</span>
+  <span class="k">val</span> <span class="n">two</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">readDouble</span>
+  <span class="o">(</span><span class="n">one</span><span class="o">,</span> <span class="n">two</span><span class="o">)</span>  
+<span class="o">}))</span>
+</code></pre></div>
+<p>Here <code>input</code> would be of type <code>DataSet[(Int, Double)]</code>.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="binary-serialized-input-format"></p>
+
+<h4 id="binaryserializedinputformat">BinarySerializedInputFormat</h4>
+
+<p>This input format is only meant to be used in conjunction with
+<code>BinarySerializedOutputFormat</code>. You can use these to write elements to files using a
+Stratosphere-internal format that can efficiently be read again. You should only
+use this when output is only meant to be consumed by other Stratosphere jobs.
+The format can be used on one of two ways:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">BinarySerializedInputFormat</span><span class="o">[</span><span class="kt">Out</span><span class="o">]()</span>
+<span class="nc">BinarySerializedInputFormat</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">blocksize</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span>
+</code></pre></div>
+<p>So if input files contain elements of type <code>(String, Int)</code> (a tuple type) you
+could use:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span> <span class="k">=</span> <span class="nc">DataSource</span><span class="o">(</span><span class="s">&quot;file:///some/file&quot;</span><span class="o">,</span> <span class="nc">BinarySerializedInputFormat</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]())</span>
+</code></pre></div>
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="fixed-length-input-format"></p>
+
+<h4 id="fixedlengthinputformat">FixedLengthInputFormat</h4>
+
+<p>This input format is for cases where you want to read binary blocks
+of a fixed size. The size of a block must be specified and you must
+provide code that reads elements from a byte array.</p>
+
+<p>The format is used like this:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">FixedLengthInputFormat</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">readFunction</span><span class="k">:</span> <span class="o">(</span><span class="kt">Array</span><span class="o">[</span><span class="kt">Byte</span><span class="o">],</span> <span class="nc">Int</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Out</span><span class="o">,</span> <span class="n">recordLength</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span>
+</code></pre></div>
+<p>The specified function gets an array and a position at which it must start
+reading the array and returns the element read from the binary data.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="operations"></p>
+
+<h2 id="operations-on-dataset">Operations on DataSet</h2>
+
+<p>As explained in <a href="pmodel.html#operators">Programming Model</a>,
+a Stratosphere job is a graph of operators that process data coming from
+sources that is finally written to sinks. When you use the Scala front end
+these operators as well as the graph is created behind the scenes. For example,
+when you write code like this:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span> <span class="k">=</span> <span class="nc">TextFile</span><span class="o">(</span><span class="s">&quot;file:///some/file&quot;</span><span class="o">)</span>
+
+<span class="k">val</span> <span class="n">words</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="n">x</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">x</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> <span class="o">}</span>
+
+<span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">counts</span><span class="o">.</span><span class="n">write</span><span class="o">(</span><span class="n">words</span><span class="o">,</span> <span class="nc">CsvOutputFormat</span><span class="o">()))</span>
+
+<span class="k">val</span> <span class="n">plan</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">ScalaPlan</span><span class="o">(</span><span class="nc">Seq</span><span class="o">(</span><span class="n">output</span><span class="o">))</span>
+</code></pre></div>
+<p>What you get is a graph that has a data source, a map operator (that contains
+the code written inside the anonymous function block), and a data sink. You 
+do not have to know about this to be able to use the Scala front end but
+it helps to remember, that when you are using Scala you are building
+a data flow graph that processes data only when executed.</p>
+
+<p>There are operations on <code>DataSet</code> that correspond to all the types of operators
+that the Stratosphere system supports. We will shortly go trough all of them with
+some examples.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="operator-templates"></p>
+
+<h4 id="basic-operator-templates">Basic Operator Templates</h4>
+
+<p>Most of the operations have three similar versions and we will
+explain them here for all of the operators together. The three versions are <code>map</code>,
+<code>flatMap</code>, and <code>filter</code>. All of them accept an anonymous function that
+defines what the operation does but the semantics are different.</p>
+
+<p>The <code>map</code> version is a simple one to one mapping. Take a look at the following
+code:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span>
+
+<span class="k">val</span> <span class="n">mapped</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="n">x</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">x</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">x</span><span class="o">.</span><span class="n">_2</span> <span class="o">+</span> <span class="mi">3</span><span class="o">)</span> <span class="o">}</span>
+</code></pre></div>
+<p>This defines a map operator that operates on tuples of String and Int and just
+adds three to the Int (the second fields of the tuple). So, if the input set had
+the tuples (a, 1), (b, 2), and (c, 3) the result after the operator would be
+(a, 4), (b, 5), and (c, 6).</p>
+
+<p>The <code>flatMap</code> version works a bit differently,
+here you return something iterable from the anonymous function. The iterable
+could be a list or an array. The elements in this iterable are unnested.
+So for every element in the input data you get a list of elements. The
+concatenation of those is the result of the operator. If you had
+the following code:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span>
+
+<span class="k">val</span> <span class="n">mapped</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">flatMap</span> <span class="o">{</span> <span class="n">x</span> <span class="k">=&gt;</span> <span class="nc">List</span><span class="o">(</span> <span class="o">(</span><span class="n">x</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">x</span><span class="o">.</span><span class="n">_2</span><span class="o">),</span> <span class="o">(</span><span class="n">x</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">x</span><span class="o">.</span><span class="n">_2</span> <span class="o">+</span> <span class="mi">1</span><span class="o">)</span> <span class="o">)</span> <span class="o">}</span>
+</code></pre></div>
+<p>and as input the tuples (a, 1) and (b, 1) you would get (a, 1), (a, 2), (b, 1),
+and (b, 2) as result. It is one flat list, and not the individual lists returned
+from the anonymous function.</p>
+
+<p>The third template is <code>filter</code>. Here you give an anonymous function that
+returns a Boolean. The elements for which this Boolean is true are part of the
+result of the operation, the others are culled. An example for a filter is this
+code:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span>
+
+<span class="k">val</span> <span class="n">mapped</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">filter</span> <span class="o">{</span> <span class="n">x</span> <span class="k">=&gt;</span> <span class="n">x</span><span class="o">.</span><span class="n">_2</span> <span class="o">&gt;=</span> <span class="mi">3</span> <span class="o">}</span>
+</code></pre></div>
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="key-selectors"></p>
+
+<h4 id="field/key-selectors">Field/Key Selectors</h4>
+
+<p>For some operations (group, join, and cogroup) it is necessary to specify which
+parts of a data type are to be considered the key. This key is used for grouping
+elements together for reduce and for joining in case of a join or cogroup.
+In Scala the key is specified using a special anonymous function called
+a key selector. The key selector has as input an element of the type of
+the <code>DataSet</code> and must return a single value or a tuple of values that should
+be considered the key. This will become clear with some examples: (Note that
+we use the reduce operation here as an example, we will have a look at
+that further down):</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span>
+<span class="k">val</span> <span class="n">reduced</span> <span class="k">=</span> <span class="n">input</span> <span class="n">groupBy</span> <span class="o">{</span> <span class="n">x</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">x</span><span class="o">.</span><span class="n">_1</span><span class="o">)</span> <span class="o">}</span> <span class="n">reduce</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
+<span class="k">val</span> <span class="n">reduced2</span> <span class="k">=</span> <span class="n">input</span> <span class="n">groupBy</span> <span class="o">{</span> <span class="k">case</span> <span class="o">(</span><span class="n">w</span><span class="o">,</span> <span class="n">c</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">w</span> <span class="o">}</span> <span class="n">reduce</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
+
+<span class="k">case</span> <span class="k">class</span> <span class="nc">Test</span><span class="o">(</span><span class="n">a</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">b</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">c</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span>
+<span class="k">val</span> <span class="n">input2</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Test</span><span class="o">]</span>
+<span class="k">val</span> <span class="n">reduced3</span> <span class="k">=</span> <span class="n">input2</span> <span class="n">groupBy</span> <span class="o">{</span> <span class="n">x</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">x</span><span class="o">.</span><span class="n">a</span><span class="o">,</span> <span class="n">x</span><span class="o">.</span><span class="n">b</span><span class="o">)</span> <span class="o">}</span> <span class="n">reduce</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
+<span class="k">val</span> <span class="n">reduced4</span> <span class="k">=</span> <span class="n">input2</span> <span class="n">groupBy</span> <span class="o">{</span> <span class="k">case</span> <span class="nc">Test</span><span class="o">(</span><span class="n">x</span><span class="o">,</span><span class="n">y</span><span class="o">,</span><span class="n">z</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">x</span><span class="o">,</span><span class="n">y</span><span class="o">)</span> <span class="o">}</span> <span class="n">reduce</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
+</code></pre></div>
+<p>The anonymous function block passed to <code>groupBy</code> is the key selector. The first
+two examples both specify the <code>String</code> field of the tuple as key. In the second
+set of examples we see a custom case class and here we select the first two
+fields as a compound key.</p>
+
+<p>It is worth noting that the key selector function is not actually executed 
+at runtime but it is parsed at job creation time where the key information is
+extracted and stored for efficient computation at runtime.</p>
+
+<h4 id="map-operation">Map Operation</h4>
+
+<p>Map is an operation that gets one element at a time and can output one or
+several elements. The operations that result in a <code>MapOperator</code> in the graph are exactly
+those mentioned in the previous section. For completeness&#39; sake we will mention
+their signatures here (in this and the following such lists <code>In</code> is the
+type of the input data set, <code>DataSet[In]</code>):</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">map</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">fun</span><span class="k">:</span> <span class="kt">In</span> <span class="o">=&gt;</span> <span class="nc">Out</span><span class="o">)</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Out</span><span class="o">]</span>
+<span class="k">def</span> <span class="n">flatMap</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">fun</span><span class="k">:</span> <span class="kt">In</span> <span class="o">=&gt;</span> <span class="nc">Iterator</span><span class="o">[</span><span class="kt">Out</span><span class="o">])</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Out</span><span class="o">]</span>
+<span class="k">def</span> <span class="n">filter</span><span class="o">(</span><span class="n">fun</span><span class="k">:</span> <span class="kt">In</span> <span class="o">=&gt;</span> <span class="nc">Boolean</span><span class="o">)</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Out</span><span class="o">]</span>
+</code></pre></div>
+<h4 id="reduce-operation">Reduce Operation</h4>
+
+<p>Reduce is an operation that looks
+at groups of elements at a time and can, for one group, output one or several
+elements. To specify how elements should be grouped you need to give
+a key selection function, as explained <a href="#key-selectors">above</a>.</p>
+
+<p>The basic template of the reduce operation is:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">input</span> <span class="n">groupBy</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">key</span> <span class="n">selector</span><span class="o">&gt;</span> <span class="o">}</span> <span class="n">reduce</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">reduce</span> <span class="n">function</span><span class="o">&gt;</span> <span class="o">}</span>
+</code></pre></div>
+<p>The signature of the reduce function depends on the variety of reduce operation
+selected. There are right now three different versions:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">reduce</span><span class="o">(</span><span class="n">fun</span><span class="k">:</span> <span class="o">(</span><span class="kt">In</span><span class="o">,</span> <span class="kt">In</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">In</span><span class="o">)</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">In</span><span class="o">]</span>
+
+<span class="k">def</span> <span class="n">reduceGroup</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">fun</span><span class="k">:</span> <span class="kt">Iterator</span><span class="o">[</span><span class="kt">In</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">Out</span><span class="o">)</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Out</span><span class="o">]</span>
+<span class="k">def</span> <span class="n">combinableReduceGroup</span><span class="o">(</span><span class="n">fun</span><span class="k">:</span> <span class="kt">Iterator</span><span class="o">[</span><span class="kt">In</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">In</span><span class="o">)</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">In</span><span class="o">]</span>
+</code></pre></div>
+<p>The <code>reduce</code> variant is like a <code>reduceLeft</code> on a Scala collection with
+the limitation that the output data type must be the same as the input data
+type. You specify how to elements of the selection should be combined,
+this is then used to reduce the elements in one group (of the same key)
+down to one element. This can be used to implement aggregation operators,
+for example:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">words</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span>
+<span class="k">val</span> <span class="n">counts</span> <span class="k">=</span> <span class="n">words</span><span class="o">.</span><span class="n">groupBy</span> <span class="o">{</span> <span class="k">case</span> <span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">count</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">word</span><span class="o">}</span>
+  <span class="o">.</span><span class="n">reduce</span> <span class="o">{</span> <span class="o">(</span><span class="n">w1</span><span class="o">,</span> <span class="n">w1</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">w1</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">w1</span><span class="o">.</span><span class="n">_2</span> <span class="o">+</span> <span class="n">w2</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span> <span class="o">}</span>
+</code></pre></div>
+<p>This would add up the Int fields of those tuples that have the same String
+in the first fields. As is for example required in Word Count.</p>
+
+<p>The <code>reduceGroup</code> variant can be used when more control is required. Here
+your reduce function gets an <code>Iterator</code> that can be used to iterate over
+all the elements in a group. With this type or reduce operation the
+output data type can be different from the input data type. An example
+of this kind of operation is this:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">words</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span>
+<span class="k">val</span> <span class="n">minCounts</span> <span class="k">=</span> <span class="n">words</span><span class="o">.</span><span class="n">groupBy</span> <span class="o">{</span> <span class="k">case</span> <span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="n">count</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">word</span><span class="o">}</span>
+  <span class="o">.</span><span class="n">reduceGroup</span> <span class="o">{</span> <span class="n">words</span> <span class="k">=&gt;</span> <span class="n">words</span><span class="o">.</span><span class="n">minBy</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">_2</span> <span class="o">}</span> <span class="o">}</span>
+</code></pre></div>
+<p>Here we use the minBy function of Scala collections to determine the
+element with the minimum count in a group.</p>
+
+<p>The <code>combinableGroupReduce</code> works like the <code>groupReduce</code> with the difference
+that the reduce operation is combinable. This is an optimization one can use,
+please have a look at <a href="pmodel.html" title="Programming Model">Programming Model</a> for
+the details.</p>
+
+<h4 id="join-operation">Join Operation</h4>
+
+<p>The join operation is similar to a database equi-join. It is a two input
+iteration where you have to specify a key selector for each of the inputs
+and then the anonymous function is called for every pair of matching
+elements from the two input sides.</p>
+
+<p>The basic template is:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">input1</span> <span class="n">join</span> <span class="n">input2</span> <span class="n">where</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">key</span> <span class="n">selector</span> <span class="mi">1</span><span class="o">&gt;</span> <span class="o">}</span> <span class="n">isEqualTo</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">key</span> <span class="n">selector</span> <span class="mi">2</span><span class="o">&gt;}</span> <span class="n">map</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">join</span> <span class="n">function</span><span class="o">&gt;</span> <span class="o">}</span>
+</code></pre></div>
+<p>or, because lines will get to long fast:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">input1</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">input2</span><span class="o">)</span>
+  <span class="o">.</span><span class="n">where</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">key</span> <span class="n">selector</span> <span class="mi">1</span><span class="o">&gt;</span> <span class="o">}</span>
+  <span class="o">.</span><span class="n">isEqualTo</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">key</span> <span class="n">selector</span> <span class="mi">2</span><span class="o">&gt;}</span>
+  <span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">join</span> <span class="n">function</span><span class="o">&gt;</span> <span class="o">}</span>
+</code></pre></div>
+<p>(Scala can sometimes be quite finicky about where you can omit dots and
+parentheses, so it&#39;s best to use dots in multi-line code like this.)</p>
+
+<p>As mentioned in <a href="#operator-templates">here</a> there are three versions of
+this operator, so you can use one of these in the last position:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">map</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">fun</span><span class="k">:</span> <span class="o">(</span><span class="kt">LeftIn</span><span class="o">,</span> <span class="kt">RightIn</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Out</span><span class="o">)</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Out</span><span class="o">]</span>
+<span class="k">def</span> <span class="n">flatMap</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">fun</span><span class="k">:</span> <span class="o">(</span><span class="kt">LeftIn</span><span class="o">,</span> <span class="kt">RightIn</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Iterator</span><span class="o">[</span><span class="kt">Out</span><span class="o">])</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Out</span><span class="o">]</span>
+<span class="k">def</span> <span class="n">filter</span><span class="o">(</span><span class="n">fun</span><span class="k">:</span> <span class="o">(</span><span class="kt">LeftIn</span><span class="o">,</span> <span class="kt">RightIn</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Boolean</span><span class="o">)</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">LeftIn</span>, <span class="kt">RightIn</span><span class="o">)]</span>
+</code></pre></div>
+<p>One example where this can be used is database-style joining with projection:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">input1</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">input2</span><span class="o">)</span>
+  <span class="o">.</span><span class="n">where</span> <span class="o">{</span> <span class="k">case</span> <span class="o">(</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">,</span> <span class="n">c</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">)</span> <span class="o">}</span>
+  <span class="o">.</span><span class="n">isEqualTo</span> <span class="o">{</span> <span class="k">case</span> <span class="o">(</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">,</span> <span class="n">c</span><span class="o">,</span> <span class="n">d</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">c</span><span class="o">,</span> <span class="n">d</span><span class="o">)</span> <span class="o">}</span>
+  <span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="o">(</span><span class="n">left</span><span class="o">,</span> <span class="n">right</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">left</span><span class="o">.</span><span class="n">_3</span><span class="o">,</span> <span class="n">right</span><span class="o">.</span><span class="n">_1</span><span class="o">)</span> <span class="o">}</span>
+</code></pre></div>
+<p>Here the join key for the left input is a compound of the first two tuple fields
+while the key for the second input is a compound of the last two fields. We then
+pick one field each from both sides as the result of the operation.</p>
+
+<h4 id="cogroup-operation">CoGroup Operation</h4>
+
+<p>The cogroup operation is a cross between join and reduce. It has two inputs
+and you have to specify a key selector for each of them. This is where the
+similarities with join stop. Instead of having one invocation of your user
+code per pair of matching elements all elements from the left and from the right
+are grouped together for one single invocation. In your function you get
+an <code>Iterator</code> for the elements from the left input and another <code>Iterator</code>
+for the elements from the right input.</p>
+
+<p>The basic template is:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">input1</span> <span class="n">cogroup</span> <span class="n">input2</span> <span class="n">where</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">key</span> <span class="n">selector</span> <span class="mi">1</span><span class="o">&gt;</span> <span class="o">}</span> <span class="n">isEqualTo</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">key</span> <span class="n">selector</span> <span class="mi">2</span><span class="o">&gt;}</span> <span class="n">map</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">cogroup</span> <span class="n">function</span><span class="o">&gt;</span> <span class="o">}</span>
+</code></pre></div>
+<p>or, because lines will get to long fast:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">input1</span><span class="o">.</span><span class="n">cogroup</span><span class="o">(</span><span class="n">input2</span><span class="o">)</span>
+  <span class="o">.</span><span class="n">where</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">key</span> <span class="n">selector</span> <span class="mi">1</span><span class="o">&gt;</span> <span class="o">}</span>
+  <span class="o">.</span><span class="n">isEqualTo</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">key</span> <span class="n">selector</span> <span class="mi">2</span><span class="o">&gt;}</span>
+  <span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">cogroup</span> <span class="n">function</span><span class="o">&gt;</span> <span class="o">}</span>
+</code></pre></div>
+<p>There are to variants you can use, with the semantics explained
+<a href="#operator-templates">here</a>.</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">map</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">fun</span><span class="k">:</span> <span class="o">(</span><span class="kt">Iterator</span><span class="o">[</span><span class="kt">LeftIn</span><span class="o">],</span> <span class="nc">Iterator</span><span class="o">[</span><span class="kt">RightIn</span><span class="o">])</span> <span class="k">=&gt;</span> <span class="nc">Out</span><span class="o">)</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Out</span><span class="o">]</span>
+<span class="k">def</span> <span class="n">flatMap</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">fun</span><span class="k">:</span> <span class="o">(</span><span class="kt">Iterator</span><span class="o">[</span><span class="kt">LeftIn</span><span class="o">],</span> <span class="nc">Iterator</span><span class="o">[</span><span class="kt">RightIn</span><span class="o">])</span> <span class="k">=&gt;</span> <span class="nc">Iterator</span><span class="o">[</span><span class="kt">Out</span><span class="o">])</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Out</span><span class="o">]</span>
+</code></pre></div>
+<h4 id="cross-operation">Cross Operation</h4>
+
+<p>The cross operation is used to form the Cartesian product of the elements
+from two inputs. The basic template is:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">input1</span> <span class="n">cross</span> <span class="n">input2</span> <span class="n">map</span> <span class="o">{</span> <span class="o">&lt;</span><span class="n">cogroup</span> <span class="n">function</span><span class="o">&gt;</span> <span class="o">}</span>
+</code></pre></div>
+<p>Again there are three variants, with the semantics explained
+<a href="#operator-templates">here</a>.</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">map</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">fun</span><span class="k">:</span> <span class="o">(</span><span class="kt">LeftIn</span><span class="o">,</span> <span class="kt">RightIn</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Out</span><span class="o">)</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Out</span><span class="o">]</span>
+<span class="k">def</span> <span class="n">flatMap</span><span class="o">[</span><span class="kt">Out</span><span class="o">](</span><span class="n">fun</span><span class="k">:</span> <span class="o">(</span><span class="kt">LeftIn</span><span class="o">,</span> <span class="kt">RightIn</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Iterator</span><span class="o">[</span><span class="kt">Out</span><span class="o">])</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">Out</span><span class="o">]</span>
+<span class="k">def</span> <span class="n">filter</span><span class="o">(</span><span class="n">fun</span><span class="k">:</span> <span class="o">(</span><span class="kt">LeftIn</span><span class="o">,</span> <span class="kt">RightIn</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Boolean</span><span class="o">)</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">LeftIn</span>, <span class="kt">RightIn</span><span class="o">)]</span>
+</code></pre></div>
+<h4 id="union">Union</h4>
+
+<p>When you want to have the combination of several data sets as the input of
+an operation you can use a union to combine them. It is used like this</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">input1</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span>
+<span class="k">val</span> <span class="n">input2</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span>
+<span class="k">val</span> <span class="n">unioned</span> <span class="k">=</span> <span class="n">input1</span><span class="o">.</span><span class="n">union</span><span class="o">(</span><span class="n">input2</span><span class="o">)</span>
+</code></pre></div>
+<p>The signature of union is:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">union</span><span class="o">(</span><span class="n">secondInput</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">A</span><span class="o">])</span>
+</code></pre></div>
+<p>Where <code>A</code> is the generic type of the <code>DataSet</code> on which you execute the <code>union</code>.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="iterations"></p>
+
+<h2 id="iterations">Iterations</h2>
+
+<p>Iterations allow you to implement <em>loops</em> in Stratosphere programs.
+<a href="iterations.html">This page</a> gives a
+general introduction to iterations. This section here provides quick examples
+of how to use the concepts using the Scala API.
+The iteration operators encapsulate a part of the program and execute it
+repeatedly, feeding back the result of one iteration (the partial solution) into
+the next iteration. Stratosphere has two different types of iterations,
+<em>Bulk Iteration</em> and <em>Delta Iteration</em>.</p>
+
+<p>For both types of iterations you provide the iteration body as a function
+that has data sets as input and returns a new data set. The difference is
+that bulk iterations map from one data set two one new data set while
+delta iterations map two data sets to two new data sets.</p>
+
+<h4 id="bulk-iteration">Bulk Iteration</h4>
+
+<p>The signature of the bulk iterate method is this:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">iterate</span><span class="o">(</span><span class="n">n</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">stepFunction</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">A</span><span class="o">]</span> <span class="k">=&gt;</span> <span class="nc">DataSet</span><span class="o">[</span><span class="kt">A</span><span class="o">])</span>
+</code></pre></div>
+<p>where <code>A</code> is the type of the <code>DataSet</code> on which <code>iterate</code> is called. The number
+of steps is given in <code>n</code>. This is how you use it in practice:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">dataPoints</span> <span class="k">=</span> <span class="nc">DataSource</span><span class="o">(</span><span class="n">dataPointInput</span><span class="o">,</span> <span class="nc">DelimitedInputFormat</span><span class="o">(</span><span class="n">parseInput</span><span class="o">))</span>
+<span class="k">val</span> <span class="n">clusterPoints</span> <span class="k">=</span> <span class="nc">DataSource</span><span class="o">(</span><span class="n">clusterInput</span><span class="o">,</span> <span class="nc">DelimitedInputFormat</span><span class="o">(</span><span class="n">parseInput</span><span class="o">))</span>
+
+<span class="k">def</span> <span class="n">kMeansStep</span><span class="o">(</span><span class="n">centers</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Point</span><span class="o">)])</span> <span class="k">=</span> <span class="o">{</span>
+
+  <span class="k">val</span> <span class="n">distances</span> <span class="k">=</span> <span class="n">dataPoints</span> <span class="n">cross</span> <span class="n">centers</span> <span class="n">map</span> <span class="n">computeDistance</span>
+  <span class="k">val</span> <span class="n">nearestCenters</span> <span class="k">=</span> <span class="n">distances</span><span class="o">.</span><span class="n">groupBy</span> <span class="o">{</span> <span class="k">case</span> <span class="o">(</span><span class="n">pid</span><span class="o">,</span> <span class="k">_</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">pid</span> <span class="o">}</span>
+    <span class="o">.</span><span class="n">reduceGroup</span> <span class="o">{</span> <span class="n">ds</span> <span class="k">=&gt;</span> <span class="n">ds</span><span class="o">.</span><span class="n">minBy</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">_2</span><span class="o">.</span><span class="n">distance</span><span class="o">)</span> <span class="o">}</span> <span class="n">map</span> <span class="n">asPointSum</span><span class="o">.</span><span class="n">tupled</span>
+  <span class="k">val</span> <span class="n">newCenters</span> <span class="k">=</span> <span class="n">nearestCenters</span><span class="o">.</span><span class="n">groupBy</span> <span class="o">{</span> <span class="k">case</span> <span class="o">(</span><span class="n">cid</span><span class="o">,</span> <span class="k">_</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">cid</span> <span class="o">}</span>
+    <span class="o">.</span><span class="n">reduceGroup</span> <span class="n">sumPointSums</span> <span class="n">map</span> <span class="o">{</span> <span class="k">case</span> <span class="o">(</span><span class="n">cid</span><span class="o">,</span> <span class="n">pSum</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">cid</span> <span class="o">-&gt;</span> <span class="n">pSum</span><span class="o">.</span><span class="n">toPoint</span><span class="o">()</span> <span class="o">}</span>
+
+  <span class="n">newCenters</span>
+<span class="o">}</span>
+
+<span class="k">val</span> <span class="n">finalCenters</span> <span class="k">=</span> <span class="n">clusterPoints</span><span class="o">.</span><span class="n">iterate</span><span class="o">(</span><span class="n">numIterations</span><span class="o">,</span> <span class="n">kMeansStep</span><span class="o">)</span>
+
+<span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">finalCenters</span><span class="o">.</span><span class="n">write</span><span class="o">(</span><span class="n">clusterOutput</span><span class="o">,</span> <span class="nc">DelimitedOutputFormat</span><span class="o">(</span><span class="n">formatOutput</span><span class="o">.</span><span class="n">tupled</span><span class="o">))</span>
+</code></pre></div>
+<p>Not that we use some functions here which we don&#39;t show. If you want, you
+can check out the complete code in our KMeans example.</p>
+
+<h4 id="delta-iteration">Delta Iteration</h4>
+
+<p>The signature of the delta iterate method is this:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">iterateWithDelta</span><span class="o">(</span><span class="n">workset</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">W</span><span class="o">],</span> <span class="n">solutionSetKey</span><span class="k">:</span> <span class="kt">A</span> <span class="o">=&gt;</span> <span class="n">K</span><span class="o">,</span> <span class="n">stepFunction</span><span class="k">:</span> <span class="o">(</span><span class="kt">DataSet</span><span class="o">[</span><span class="kt">A</span><span class="o">],</span> <span class="nc">DataSet</span><span class="o">[</span><span class="kt">W</span><span class="o">])</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="nc">DataSet</span><span class="o">[</span><span class="kt">A</span><span class="o">],</span> <span class="nc">DataSet</span><span class="o">[</
 span><span class="kt">W</span><span class="o">]),</span> <span class="n">maxIterations</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span>
+</code></pre></div>
+<p>where <code>A</code> is the type of the <code>DataSet</code> on which <code>iterateWithDelta</code> is called,
+<code>W</code> is the type of the <code>DataSet</code> that represents the workset and <code>K</code> is the
+key type. The maximum number of iterations must always be given.</p>
+
+<p>For information on how delta iterations in general work on our system, please
+refer to <a href="iterations.html">iterations</a>. A working example job is
+available here:
+<a href="examples_scala.html#connected_components">Scala Connected Components Example</a> </p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="data-sinks"></p>
+
+<h2 id="creating-data-sinks">Creating Data Sinks</h2>
+
+<p>The creation of data sinks is analog to the creation of data sources. <code>DataSet</code>
+has a <code>write</code> method that is used to create a sink that writes the output
+of the operation to a file in the local file system or HDFS. The general pattern
+is this:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">sink</span> <span class="k">=</span> <span class="n">out</span><span class="o">.</span><span class="n">write</span><span class="o">(</span><span class="s">&quot;&lt;file-path&gt;&quot;</span><span class="o">,</span> <span class="o">&lt;</span><span class="n">output</span><span class="o">-</span><span class="n">format</span><span class="o">&gt;)</span>
+</code></pre></div>
+<p>Where <code>out</code> is some <code>DataSet</code>. Just as for data sources, the file path can be
+on of either <code>file:///some/file</code> to acces files on the local machine or
+<code>hdfs://some/path</code> to read files from HDFS. The output format can be one of our
+builtin formats or a custom output format. The builtin formats are:</p>
+
+<ul>
+<li><a href="#delimited-output-format">DelimitedOutputFormat</a></li>
+<li><a href="#csv-output-format">CsvOutputFormat</a></li>
+<li><a href="#raw-output-format">RawOutputFormat</a></li>
+<li><a href="#binary-output-format">BinaryOutputFormat</a></li>
+<li><a href="#binary-serialized-output-format">BinarySerializedOutputFormat</a></li>
+</ul>
+
+<p>We will now have a look at each of them and show how they are employed and in
+which situations.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="delimited-output-format"></p>
+
+<h4 id="delimitedoutputformat">DelimitedOutputFormat</h4>
+
+<p>This output format is meant for writing textual records that are separated by
+some delimiter. The delimiter could be a newline, for example. It is used like
+this:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">DelimitedOutputFormat</span><span class="o">[</span><span class="kt">In</span><span class="o">](</span><span class="n">formatFunction</span><span class="k">:</span> <span class="kt">In</span> <span class="o">=&gt;</span> <span class="nc">String</span><span class="o">,</span> <span class="n">delim</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="s">&quot;\n&quot;</span><span class="o">)</span>
+</code></pre></div>
+<p>For every element in the <code>DataSet</code> the formatting function is called and
+the result of that is appended to the output file. In between the elements
+the <code>delim</code> string is inserted.</p>
+
+<p>An example would be:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">out</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span>
+<span class="k">val</span> <span class="n">sink</span> <span class="k">=</span> <span class="n">out</span><span class="o">.</span><span class="n">write</span><span class="o">(</span><span class="s">&quot;file:///some/file&quot;</span><span class="o">,</span> <span class="nc">DelimitedOutputFormat</span><span class="o">(</span> <span class="o">{</span> <span class="n">elem</span> <span class="k">=&gt;</span>
+  <span class="s">&quot;%s|%d&quot;</span><span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="n">elem</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">elem</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span>
+<span class="o">}))</span>
+</code></pre></div>
+<p>Here we use Scala String formatting to write the two fields of the tuple
+separated by a pipe character. The default newline delimiter will be inserted
+between the elements in the output files.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="csv-output-format"></p>
+
+<h4 id="csvoutputformat">CsvOutputFormat</h4>
+
+<p>This output format can be used to automatically write fields of tuple
+elements or case classes to CSV files. You can specify what separator should
+be used between fields of an element and also the separator between elements.</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">CsvOutputFormat</span><span class="o">[</span><span class="kt">In</span><span class="o">]()</span>
+<span class="nc">CsvOutputFormat</span><span class="o">[</span><span class="kt">In</span><span class="o">](</span><span class="n">recordDelim</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span>
+<span class="nc">CsvOutputFormat</span><span class="o">[</span><span class="kt">In</span><span class="o">](</span><span class="n">recordDelim</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">fieldDelim</span><span class="k">:</span> <span class="kt">Char</span><span class="o">)</span>
+</code></pre></div>
+<p>The default record delimiter is a newline, the default field delimiter is a
+comma. </p>
+
+<p>An example usage could look as follows:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">out</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span>
+<span class="k">val</span> <span class="n">sink</span> <span class="k">=</span> <span class="n">out</span><span class="o">.</span><span class="n">write</span><span class="o">(</span><span class="s">&quot;file:///some/file&quot;</span><span class="o">,</span> <span class="nc">CsvOutputFormat</span><span class="o">())</span>
+</code></pre></div>
+<p>Notice how we don&#39;t need to specify the generic type here, it is inferred.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="raw-output-format"></p>
+
+<h4 id="rawoutputformat">RawOutputFormat</h4>
+
+<p>This input format can be used when you want to have complete control over
+what gets written. You get an
+<a href="http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html">OutputStream</a>
+and can write the elements of the <code>DataSet</code> exactly as you see fit.</p>
+
+<p>A <code>RawOutputFormat</code> is created like this:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">RawOutputFormat</span><span class="o">[</span><span class="kt">In</span><span class="o">](</span><span class="n">writeFunction</span><span class="k">:</span> <span class="o">(</span><span class="kt">In</span><span class="o">,</span> <span class="kt">OutputStream</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Unit</span><span class="o">)</span>
+</code></pre></div>
+<p>The function you pass in gets one element from the <code>DataSet</code> and must
+write it to the given <code>OutputStream</code>. An example would be the following:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">out</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span>
+<span class="k">val</span> <span class="n">sink</span> <span class="k">=</span> <span class="n">out</span><span class="o">.</span><span class="n">write</span><span class="o">(</span><span class="s">&quot;file:///some/file&quot;</span><span class="o">,</span> <span class="nc">RawOutputFormat</span><span class="o">(</span> <span class="o">{</span> <span class="o">(</span><span class="n">elem</span><span class="o">,</span> <span class="n">output</span><span class="o">)</span> <span class="k">=&gt;</span>
+  <span class="cm">/* write elem._1 and elem._2 to output */</span> 
+<span class="o">}))</span>
+</code></pre></div>
+<p><section id="binary-output-format"></p>
+
+<h4 id="binaryoutputformat">BinaryOutputFormat</h4>
+
+<p>This format is very similar to the RawOutputFormat. The difference is that
+instead of an <a href="http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html">OutputStream</a>
+you get a <a href="http://docs.oracle.com/javase/7/docs/api/java/io/DataOutput.html">DataOutput</a>
+to which you can write binary data. You can also specify the block size for
+the binary output file. When you don&#39;t specify a block size some default
+is used.</p>
+
+<p>A <code>BinaryOutputFormat</code> is created like this:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">BinaryOutputFormat</span><span class="o">[</span><span class="kt">In</span><span class="o">](</span><span class="n">writeFunction</span><span class="k">:</span> <span class="o">(</span><span class="kt">In</span><span class="o">,</span> <span class="kt">DataOutput</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Unit</span><span class="o">)</span>
+<span class="nc">BinaryOutputFormat</span><span class="o">[</span><span class="kt">In</span><span class="o">](</span><span class="n">writeFunction</span><span class="k">:</span> <span class="o">(</span><span class="kt">In</span><span class="o">,</span> <span class="kt">DataOutput</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">Unit</span><span class="o">,</span> <span class="n">blockSize</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span>
+</code></pre></div>
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="binary-serialized-output-format"></p>
+
+<h4 id="binaryserializedoutputformat">BinarySerializedOutputFormat</h4>
+
+<p>This output format is only meant to be used in conjunction with
+<code>BinarySerializedInputFormat</code>. You can use these to write elements to files using a
+Stratosphere-internal format that can efficiently be read again. You should only
+use this when output is only meant to be consumed by other Stratosphere jobs.
+The output format can be used on one of two ways:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">BinarySerializedOutputFormat</span><span class="o">[</span><span class="kt">In</span><span class="o">]()</span>
+<span class="nc">BinarySerializedOutputFormat</span><span class="o">[</span><span class="kt">In</span><span class="o">](</span><span class="n">blocksize</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span>
+</code></pre></div>
+<p>So to write elements of some <code>DataSet[A]</code> to a binary file you could use:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">out</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span>
+<span class="k">val</span> <span class="n">sink</span> <span class="k">=</span> <span class="n">out</span><span class="o">.</span><span class="n">write</span><span class="o">(</span><span class="s">&quot;file:///some/file&quot;</span><span class="o">,</span> <span class="nc">BinarySerializedInputFormat</span><span class="o">())</span>
+</code></pre></div>
+<p>As you can see the type of the elements need not be specified, it is inferred
+by Scala.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<p><section id="execution"></p>
+
+<h2 id="executing-jobs">Executing Jobs</h2>
+
+<p>To execute a data flow graph the sinks need to be wrapped in a <a href=https://github.com/apache/incubator-flink/blob/master//stratosphere-scala/src/main/scala/eu/stratosphere/api/scala/ScalaPlan.scala>ScalaPlan</a> object like this:</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">out</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">Int</span><span class="o">)]</span>
+<span class="k">val</span> <span class="n">sink</span> <span class="k">=</span> <span class="n">out</span><span class="o">.</span><span class="n">write</span><span class="o">(</span><span class="s">&quot;file:///some/file&quot;</span><span class="o">,</span> <span class="nc">CsvOutputFormat</span><span class="o">())</span>
+

[... 164 lines stripped ...]