You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/07/04 13:00:20 UTC

svn commit: r1607832 [27/33] - in /incubator/flink: ./ _includes/ _layouts/ _plugins/ _posts/ blog/ css/ fonts/ img/ img/blog/ js/ site/ site/blog/ site/blog/page2/ site/css/ site/docs/ site/docs/0.6-SNAPSHOT/ site/docs/0.6-SNAPSHOT/css/ site/docs/0.6-...

Added: incubator/flink/site/docs/0.6-SNAPSHOT/scala_api_examples.html
URL: http://svn.apache.org/viewvc/incubator/flink/site/docs/0.6-SNAPSHOT/scala_api_examples.html?rev=1607832&view=auto
==============================================================================
--- incubator/flink/site/docs/0.6-SNAPSHOT/scala_api_examples.html (added)
+++ incubator/flink/site/docs/0.6-SNAPSHOT/scala_api_examples.html Fri Jul  4 11:00:15 2014
@@ -0,0 +1,415 @@
+<!DOCTYPE html>
+<html lang="en">
+  <head>
+    <meta charset="utf-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1">
+    <title>Apache Flink (incubating): Scala API Examples</title>
+    <link rel="stylesheet" href="/css/bootstrap.css">
+    <link rel="stylesheet" href="/css/bootstrap-lumen-custom.css">
+    <link href="//maxcdn.bootstrapcdn.com/font-awesome/4.1.0/css/font-awesome.min.css" rel="stylesheet">
+  </head>
+  <body>
+
+<nav class="navbar navbar-default navbar-static-top" role="navigation">
+  <div class="container">
+    <div class="navbar-header">
+      <button type="button" class="navbar-toggle" data-toggle="collapse" data-target=".navbar-collapse">
+        <span class="sr-only">Toggle navigation</span>
+        <span class="icon-bar"></span>
+        <span class="icon-bar"></span>
+        <span class="icon-bar"></span>
+      </button>
+      <a class="navbar-brand" href="/index.html">Apache Flink</a>
+    </div>
+
+    <div class="collapse navbar-collapse" id="navbar-collapse-1">
+      <ul class="nav navbar-nav">
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Quickstart <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="/docs/0.6-SNAPSHOT/setup_quickstart.html">Setup Flink</a></li>
+            <li><a href="/docs/0.6-SNAPSHOT/java_api_quickstart.html">Java API</a></li>
+            <li><a href="/docs/0.6-SNAPSHOT/scala_api_quickstart.html">Scala API</a></li>
+          </ul>
+        </li>
+
+        <li>
+          <a href="/downloads.html" class="">Downloads</a>
+        </li>
+
+        <li>
+          <a href="/docs/0.6-SNAPSHOT/faq.html" class="">FAQ</a>
+        </li>
+
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Documentation <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="/docs/0.6-SNAPSHOT/">0.6-SNAPSHOT</a></li>
+            <li><a href="http://stratosphere-javadocs.github.io/">0.6-SNAPSHOT Javadocs</a></li>
+          </ul>
+        </li>
+
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Community <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="/community.html#mailing-lists">Mailing Lists</a></li>
+            <li><a href="/community.html#issues">Issues</a></li>
+            <li><a href="/community.html#team">Team</a></li>
+            <li class="divider"></li>
+            <li><a href="/how-to-contribute.html">How To Contribute</a></li>
+          </ul>
+        </li>
+
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">ASF <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="http://www.apache.org/">Apache Software Foundation</a>
+            <li><a href="http://www.apache.org/foundation/how-it-works.html">How it works</a>
+            <li><a href="http://www.apache.org/foundation/thanks.html">Thanks</a>
+            <li><a href="http://www.apache.org/foundation/sponsorship.html">Become a Sponsor</a>
+            <li><a href="http://incubator.apache.org/projects/flink.html">Incubation Status page</a></li>
+          </ul>
+        </li>
+
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Project <b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <!--<li><a href="/project.html#history">History</a></li> -->
+            <li><a href="https://wiki.apache.org/incubator/StratosphereProposal">Incubator Proposal (external)</a></li>
+            <li><a href="http://www.apache.org/licenses/LICENSE-2.0">License</a></li>
+            <li><a href="https://github.com/apache/incubator-flink">Source Code</a></li>
+          </ul>
+        </li>
+
+        <li>
+          <a href="/blog/index.html" class="">Blog</a>
+        </li>
+
+      </ul>
+    </div>
+  </div>
+</nav>
+
+    <div class="container">
+
+<div class="row">
+  <div class="col-md-3">
+    <ul>
+      <li><a href="faq.html">FAQ</a></li>
+      <li>Quickstart
+        <ul>
+          <li><a href="setup_quickstart.html">Setup</a></li>
+          <li><a href="run_example_quickstart.html">Run Example</a></li>
+          <li><a href="java_api_quickstart.html">Java API</a></li>
+          <li><a href="scala_api_quickstart.html">Scala API</a></li>
+        </ul>
+      </li>
+
+      <li>Setup &amp; Configuration
+        <ul>
+          <li><a href="local_setup.html">Local Setup</a></li>
+          <li><a href="cluster_setup.html">Cluster Setup</a></li>
+          <li><a href="yarn_setup.html">YARN Setup</a></li>
+          <li><a href="config.html">Configuration</a></li>
+        </ul>
+      </li>
+
+      <li>Programming Guides
+        <ul>
+          <li><a href="java_api_guide.html">Java API</a></li>
+          <li><a href="scala_api_guide.html">Scala API</a></li>
+          <li><a href="hadoop_compatability.html">Hadoop Compatability</a></li>
+          <li><a href="iterations.html">Iterations</a></li>
+          <li><a href="spargel_guide.html">Spargel Graph API</a></li>
+        </ul>
+      </li>
+
+      <li>Examples
+        <ul>
+          <li><a href="java_api_examples.html">Java API</a></li>
+          <li><a href="scala_api_examples.html">Scala API</a></li>
+        </ul>
+      </li>
+
+      <li>Execution
+        <ul>
+          <li><a href="local_execution.html">Local/Debugging</a></li>
+          <li><a href="cluster_execution.html">Cluster</a></li>
+          <li><a href="cli.html">Command-Line Interface</a></li>
+          <li><a href="web_client.html">Web Interface</a></li>
+        </ul>
+      </li>
+
+      <li>Internals
+        <ul>
+          <li><a href="internal_overview.html">Overview</a></li>
+        </ul>
+      </li>
+    </ul>
+  </div>
+  <div class="col-md-9">
+      <h1>Scala API Examples</h1>
+
+      <ul>
+<li>
+<a href="#word-count">Word Count</a>
+</li>
+<li>
+<a href="#page-rank">Page Rank</a>
+</li>
+<li>
+<a href="#connected-components">Connected Components</a>
+</li>
+<li>
+<a href="#relational-query">Relational Query</a>
+</li>
+</ul>
+
+
+      <p>The following example programs showcase different applications of Stratosphere from simple word counting to graph algorithms.
+The code samples illustrate the use of <a href="scala_api_guide.html">Stratosphere&#39;s Scala API</a>. </p>
+
+<p>The full source code of the following and more examples can be found in the <a href="https://github.com/apache/incubator-flink/tree/ca2b287a7a78328ebf43766b9fdf39b56fb5fd4f/stratosphere-examples/stratosphere-scala-examples">stratosphere-scala-examples</a> module.</p>
+
+<h1 id="word-count">Word Count</h1>
+
+<p>WordCount is the &quot;Hello World&quot; of Big Data processing systems. It computes the frequency of words in a text collection. The algorithm works in two steps: First, the texts are splits the text to individual words. Second, the words are grouped and counted.</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// read input data</span>
+<span class="k">val</span> <span class="n">input</span> <span class="k">=</span> <span class="nc">TextFile</span><span class="o">(</span><span class="n">textInput</span><span class="o">)</span>
+
+<span class="c1">// tokenize words</span>
+<span class="k">val</span> <span class="n">words</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">flatMap</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">)</span> <span class="n">map</span> <span class="o">{</span> <span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span> <span class="o">}</span> <span class="o">}</span>
+
+<span class="c1">// count by word</span>
+<span class="k">val</span> <span class="n">counts</span> <span class="k">=</span> <span class="n">words</span><span class="o">.</span><span class="n">groupBy</span> <span class="o">{</span> <span class="k">case</span> <span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="k">_</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">word</span> <span class="o">}</span>
+  <span class="o">.</span><span class="n">reduce</span> <span class="o">{</span> <span class="o">(</span><span class="n">w1</span><span class="o">,</span> <span class="n">w2</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">w1</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">w1</span><span class="o">.</span><span class="n">_2</span> <span class="o">+</span> <span class="n">w2</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span> <span class="o">}</span>
+
+<span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">counts</span><span class="o">.</span><span class="n">write</span><span class="o">(</span><span class="n">wordsOutput</span><span class="o">,</span> <span class="nc">CsvOutputFormat</span><span class="o">()))</span>
+</code></pre></div>
+<p>The <a href=https://github.com/apache/incubator-flink/blob/master//stratosphere-examples/stratosphere-scala-examples/src/main/scala/eu/stratosphere/examples/scala/wordcount/WordCount.scala>WordCount example</a> implements the above described algorithm with input parameters: <code>&lt;degree of parallelism&gt;, &lt;text input path&gt;, &lt;output path&gt;</code>. As test data, any text file will do.</p>
+
+<h1 id="page-rank">Page Rank</h1>
+
+<p>The PageRank algorithm computes the &quot;importance&quot; of pages in a graph defined by links, which point from one pages to another page. It is an iterative graph algorithm, which means that it repeatedly applies the same computation. In each iteration, each page distributes its current rank over all its neighbors, and compute its new rank as a taxed sum of the ranks it received from its neighbors. The PageRank algorithm was popularized by the Google search engine which uses the importance of webpages to rank the results of search queries.</p>
+
+<p>In this simple example, PageRank is implemented with a <a href="java_api_guide.html#iterations">bulk iteration</a> and a fixed number of iterations.</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// cases classes so we have named fields</span>
+<span class="k">case</span> <span class="k">class</span> <span class="nc">PageWithRank</span><span class="o">(</span><span class="n">pageId</span><span class="k">:</span> <span class="kt">Long</span><span class="o">,</span> <span class="n">rank</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span>
+<span class="k">case</span> <span class="k">class</span> <span class="nc">Edge</span><span class="o">(</span><span class="n">from</span><span class="k">:</span> <span class="kt">Long</span><span class="o">,</span> <span class="n">to</span><span class="k">:</span> <span class="kt">Long</span><span class="o">,</span> <span class="n">transitionProbability</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span>
+
+<span class="c1">// constants for the page rank formula</span>
+<span class="k">val</span> <span class="n">dampening</span> <span class="k">=</span> <span class="mf">0.85</span>
+<span class="k">val</span> <span class="n">randomJump</span> <span class="k">=</span> <span class="o">(</span><span class="mf">1.0</span> <span class="o">-</span> <span class="n">dampening</span><span class="o">)</span> <span class="o">/</span> <span class="nc">NUM_VERTICES</span>
+<span class="k">val</span> <span class="n">initialRank</span> <span class="k">=</span> <span class="mf">1.0</span> <span class="o">/</span> <span class="nc">NUM_VERTICES</span>
+
+<span class="c1">// read inputs</span>
+<span class="k">val</span> <span class="n">pages</span> <span class="k">=</span> <span class="nc">DataSource</span><span class="o">(</span><span class="n">verticesPath</span><span class="o">,</span> <span class="nc">CsvInputFormat</span><span class="o">[</span><span class="kt">Long</span><span class="o">]())</span>
+<span class="k">val</span> <span class="n">edges</span> <span class="k">=</span> <span class="nc">DataSource</span><span class="o">(</span><span class="n">edgesPath</span><span class="o">,</span> <span class="nc">CsvInputFormat</span><span class="o">[</span><span class="kt">Edge</span><span class="o">]())</span>
+
+<span class="c1">// assign initial rank</span>
+<span class="k">val</span> <span class="n">pagesWithRank</span> <span class="k">=</span> <span class="n">pages</span> <span class="n">map</span> <span class="o">{</span> <span class="n">p</span> <span class="k">=&gt;</span> <span class="nc">PageWithRank</span><span class="o">(</span><span class="n">p</span><span class="o">,</span> <span class="n">initialRank</span><span class="o">)</span> <span class="o">}</span>
+
+<span class="c1">// the iterative computation</span>
+<span class="k">def</span> <span class="n">computeRank</span><span class="o">(</span><span class="n">ranks</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">PageWithRank</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span>
+
+    <span class="c1">// send rank to neighbors</span>
+    <span class="k">val</span> <span class="n">ranksForNeighbors</span> <span class="k">=</span> <span class="n">ranks</span> <span class="n">join</span> <span class="n">edges</span>
+        <span class="n">where</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">pageId</span> <span class="o">}</span> <span class="n">isEqualTo</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">from</span> <span class="o">}</span>
+        <span class="n">map</span> <span class="o">{</span> <span class="o">(</span><span class="n">p</span><span class="o">,</span> <span class="n">e</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">e</span><span class="o">.</span><span class="n">to</span><span class="o">,</span> <span class="n">p</span><span class="o">.</span><span class="n">rank</span> <span class="o">*</span> <span class="n">e</span><span class="o">.</span><span class="n">transitionProbability</span><span class="o">)</span> <span class="o">}</span>
+
+    <span class="c1">// gather ranks per vertex and apply page rank formula</span>
+    <span class="n">ranksForNeighbors</span> <span class="o">.</span><span class="n">groupBy</span> <span class="o">{</span> <span class="k">case</span> <span class="o">(</span><span class="n">node</span><span class="o">,</span> <span class="n">rank</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">node</span> <span class="o">}</span>
+                      <span class="o">.</span><span class="n">reduce</span> <span class="o">{</span> <span class="o">(</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">a</span><span class="o">.</span><span class="n">_1</span><span class="o">,</span> <span class="n">a</span><span class="o">.</span><span class="n">_2</span> <span class="o">+</span> <span class="n">b</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span> <span class="o">}</span>
+                      <span class="o">.</span><span class="n">map</span> <span class="o">{</span><span class="k">case</span> <span class="o">(</span><span class="n">node</span><span class="o">,</span> <span class="n">rank</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">PageWithRank</span><span class="o">(</span><span class="n">node</span><span class="o">,</span> <span class="n">rank</span> <span class="o">*</span> <span class="n">dampening</span> <span class="o">+</span> <span class="n">randomJump</span><span class="o">)</span> <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="c1">// invoke iteratively</span>
+<span class="k">val</span> <span class="n">finalRanks</span> <span class="k">=</span> <span class="n">pagesWithRank</span><span class="o">.</span><span class="n">iterate</span><span class="o">(</span><span class="n">numIterations</span><span class="o">,</span> <span class="n">computeRank</span><span class="o">)</span>
+<span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">finalRanks</span><span class="o">.</span><span class="n">write</span><span class="o">(</span><span class="n">outputPath</span><span class="o">,</span> <span class="nc">CsvOutputFormat</span><span class="o">())</span>
+</code></pre></div>
+<p>The <a href=https://github.com/apache/incubator-flink/blob/master//stratosphere-examples/stratosphere-scala-examples/src/main/scala/eu/stratosphere/examples/scala/graph/PageRank.scala>PageRank program</a> implements the above example.
+It requires the following parameters to run: <code>&lt;pages input path&gt;, &lt;link input path&gt;, &lt;output path&gt;, &lt;num pages&gt;, &lt;num iterations&gt;</code>.</p>
+
+<p>Input files are plain text files and must be formatted as follows:</p>
+
+<ul>
+<li>Pages represented as an (long) ID separated by new-line characters.
+
+<ul>
+<li>For example <code>&quot;1\n2\n12\n42\n63\n&quot;</code> gives five pages with IDs 1, 2, 12, 42, and 63.</li>
+</ul></li>
+<li>Links are represented as pairs of page IDs which are separated by space characters. Links are separated by new-line characters:
+
+<ul>
+<li>For example <code>&quot;1 2\n2 12\n1 12\n42 63\n&quot;</code> gives four (directed) links (1)-&gt;(2), (2)-&gt;(12), (1)-&gt;(12), and (42)-&gt;(63).</li>
+</ul></li>
+</ul>
+
+<p>For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).</p>
+
+<h1 id="connected-components">Connected Components</h1>
+
+<p>The Connected Components algorithm identifies parts of a larger graph which are connected by assigning all vertices in the same connected part the same component ID. Similar to PageRank, Connected Components is an iterative algorithm. In each step, each vertex propagates its current component ID to all its neighbors. A vertex accepts the component ID from a neighbor, if it is smaller than its own component ID.</p>
+
+<p>This implementation uses a <a href="iterations.html">delta iteration</a>: Vertices that have not changed their component id do not participate in the next step. This yields much better performance, because the later iterations typically deal only with a few outlier vertices.</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// define case classes</span>
+<span class="k">case</span> <span class="k">class</span> <span class="nc">VertexWithComponent</span><span class="o">(</span><span class="n">vertex</span><span class="k">:</span> <span class="kt">Long</span><span class="o">,</span> <span class="n">componentId</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span>
+<span class="k">case</span> <span class="k">class</span> <span class="nc">Edge</span><span class="o">(</span><span class="n">from</span><span class="k">:</span> <span class="kt">Long</span><span class="o">,</span> <span class="n">to</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span>
+
+<span class="c1">// get input data</span>
+<span class="k">val</span> <span class="n">vertices</span> <span class="k">=</span> <span class="nc">DataSource</span><span class="o">(</span><span class="n">verticesPath</span><span class="o">,</span> <span class="nc">CsvInputFormat</span><span class="o">[</span><span class="kt">Long</span><span class="o">]())</span>
+<span class="k">val</span> <span class="n">directedEdges</span> <span class="k">=</span> <span class="nc">DataSource</span><span class="o">(</span><span class="n">edgesPath</span><span class="o">,</span> <span class="nc">CsvInputFormat</span><span class="o">[</span><span class="kt">Edge</span><span class="o">]())</span>
+
+<span class="c1">// assign each vertex its own ID as component ID</span>
+<span class="k">val</span> <span class="n">initialComponents</span> <span class="k">=</span> <span class="n">vertices</span> <span class="n">map</span> <span class="o">{</span> <span class="n">v</span> <span class="k">=&gt;</span> <span class="nc">VertexWithComponent</span><span class="o">(</span><span class="n">v</span><span class="o">,</span> <span class="n">v</span><span class="o">)</span> <span class="o">}</span>
+<span class="k">val</span> <span class="n">undirectedEdges</span> <span class="k">=</span> <span class="n">directedEdges</span> <span class="n">flatMap</span> <span class="o">{</span> <span class="n">e</span> <span class="k">=&gt;</span> <span class="nc">Seq</span><span class="o">(</span><span class="n">e</span><span class="o">,</span> <span class="nc">Edge</span><span class="o">(</span><span class="n">e</span><span class="o">.</span><span class="n">to</span><span class="o">,</span> <span class="n">e</span><span class="o">.</span><span class="n">from</span><span class="o">))</span> <span class="o">}</span>
+
+<span class="k">def</span> <span class="n">propagateComponent</span><span class="o">(</span><span class="n">s</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">VertexWithComponent</span><span class="o">],</span> <span class="n">ws</span><span class="k">:</span> <span class="kt">DataSet</span><span class="o">[</span><span class="kt">VertexWithComponent</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span>
+  <span class="k">val</span> <span class="n">allNeighbors</span> <span class="k">=</span> <span class="n">ws</span> <span class="n">join</span> <span class="n">undirectedEdges</span>
+        <span class="n">where</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">vertex</span> <span class="o">}</span> <span class="n">isEqualTo</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">from</span> <span class="o">}</span>
+        <span class="n">map</span> <span class="o">{</span> <span class="o">(</span><span class="n">v</span><span class="o">,</span> <span class="n">e</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">VertexWithComponent</span><span class="o">(</span><span class="n">e</span><span class="o">.</span><span class="n">to</span><span class="o">,</span> <span class="n">v</span><span class="o">.</span><span class="n">componentId</span> <span class="o">)</span> <span class="o">}</span>
+
+    <span class="k">val</span> <span class="n">minNeighbors</span> <span class="k">=</span> <span class="n">allNeighbors</span> <span class="n">groupBy</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">vertex</span> <span class="o">}</span> <span class="n">reduceGroup</span> <span class="o">{</span> <span class="n">cs</span> <span class="k">=&gt;</span> <span class="n">cs</span> <span class="n">minBy</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">componentId</span> <span class="o">}</span> <span class="o">}</span>
+
+    <span class="c1">// updated solution elements == new workset</span>
+    <span class="k">val</span> <span class="n">s1</span> <span class="k">=</span> <span class="n">s</span> <span class="n">join</span> <span class="n">minNeighbors</span>
+        <span class="n">where</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">vertex</span> <span class="o">}</span> <span class="n">isEqualTo</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">vertex</span> <span class="o">}</span>
+        <span class="n">flatMap</span> <span class="o">{</span> <span class="o">(</span><span class="n">curr</span><span class="o">,</span> <span class="n">candidate</span><span class="o">)</span> <span class="k">=&gt;</span>
+            <span class="k">if</span> <span class="o">(</span><span class="n">candidate</span><span class="o">.</span><span class="n">componentId</span> <span class="o">&lt;</span> <span class="n">curr</span><span class="o">.</span><span class="n">componentId</span><span class="o">)</span> <span class="nc">Some</span><span class="o">(</span><span class="n">candidate</span><span class="o">)</span> <span class="k">else</span> <span class="nc">None</span>
+        <span class="o">}</span>
+
+  <span class="o">(</span><span class="n">s1</span><span class="o">,</span> <span class="n">s1</span><span class="o">)</span>
+<span class="o">}</span>
+
+<span class="k">val</span> <span class="n">components</span> <span class="k">=</span> <span class="n">initialComponents</span><span class="o">.</span><span class="n">iterateWithDelta</span><span class="o">(</span><span class="n">initialComponents</span><span class="o">,</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">vertex</span> <span class="o">},</span> <span class="n">propagateComponent</span><span class="o">,</span>
+                    <span class="n">maxIterations</span><span class="o">)</span>
+<span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">components</span><span class="o">.</span><span class="n">write</span><span class="o">(</span><span class="n">componentsOutput</span><span class="o">,</span> <span class="nc">CsvOutputFormat</span><span class="o">())</span>
+</code></pre></div>
+<p>The <a href=https://github.com/apache/incubator-flink/blob/master//stratosphere-examples/stratosphere-scala-examples/src/main/scala/eu/stratosphere/examples/scala/graph/ConnectedComponents.scala>ConnectedComponents program</a> implements the above example. It requires the following parameters to run: <code>&lt;vertex input path&gt;, &lt;edge input path&gt;, &lt;output path&gt; &lt;max num iterations&gt;</code>.</p>
+
+<p>Input files are plain text files and must be formatted as follows:</p>
+
+<ul>
+<li>Vertices represented as IDs and separated by new-line characters.
+
+<ul>
+<li>For example <code>&quot;1\n2\n12\n42\n63\n&quot;</code> gives five vertices with (1), (2), (12), (42), and (63).</li>
+</ul></li>
+<li>Edges are represented as pairs for vertex IDs which are separated by space characters. Edges are separated by new-line characters:
+
+<ul>
+<li>For example <code>&quot;1 2\n2 12\n1 12\n42 63\n&quot;</code> gives four (undirected) links (1)-(2), (2)-(12), (1)-(12), and (42)-(63).</li>
+</ul></li>
+</ul>
+
+<h1 id="relational-query">Relational Query</h1>
+
+<p>The Relational Query example assumes two tables, one with <code>orders</code> and the other with <code>lineitems</code> as specified by the <a href="http://www.tpc.org/tpch/">TPC-H decision support benchmark</a>. TPC-H is a standard benchmark in the database industry. See below for instructions how to generate the input data.</p>
+
+<p>The example implements the following SQL query.</p>
+<div class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">l_orderkey</span><span class="p">,</span> <span class="n">o_shippriority</span><span class="p">,</span> <span class="k">sum</span><span class="p">(</span><span class="n">l_extendedprice</span><span class="p">)</span> <span class="k">as</span> <span class="n">revenue</span>
+    <span class="k">FROM</span> <span class="n">orders</span><span class="p">,</span> <span class="n">lineitem</span>
+<span class="k">WHERE</span> <span class="n">l_orderkey</span> <span class="o">=</span> <span class="n">o_orderkey</span>
+    <span class="k">AND</span> <span class="n">o_orderstatus</span> <span class="o">=</span> <span class="ss">&quot;F&quot;</span> 
+    <span class="k">AND</span> <span class="k">YEAR</span><span class="p">(</span><span class="n">o_orderdate</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">1993</span>
+    <span class="k">AND</span> <span class="n">o_orderpriority</span> <span class="k">LIKE</span> <span class="ss">&quot;5%&quot;</span>
+<span class="k">GROUP</span> <span class="k">BY</span> <span class="n">l_orderkey</span><span class="p">,</span> <span class="n">o_shippriority</span><span class="p">;</span>
+</code></pre></div>
+<p>The Stratosphere Scala program, which implements the above query looks as follows.</p>
+<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// --- define some custom classes to address fields by name ---</span>
+<span class="k">case</span> <span class="k">class</span> <span class="nc">Order</span><span class="o">(</span><span class="n">orderId</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">status</span><span class="k">:</span> <span class="kt">Char</span><span class="o">,</span> <span class="n">date</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">orderPriority</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">shipPriority</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span>
+<span class="k">case</span> <span class="k">class</span> <span class="nc">LineItem</span><span class="o">(</span><span class="n">orderId</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">extendedPrice</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span>
+<span class="k">case</span> <span class="k">class</span> <span class="nc">PrioritizedOrder</span><span class="o">(</span><span class="n">orderId</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">shipPriority</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">revenue</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span>
+
+<span class="k">val</span> <span class="n">orders</span> <span class="k">=</span> <span class="nc">DataSource</span><span class="o">(</span><span class="n">ordersInputPath</span><span class="o">,</span> <span class="nc">DelimitedInputFormat</span><span class="o">(</span><span class="n">parseOrder</span><span class="o">))</span>
+<span class="k">val</span> <span class="n">lineItem2600s</span> <span class="k">=</span> <span class="nc">DataSource</span><span class="o">(</span><span class="n">lineItemsInput</span><span class="o">,</span> <span class="nc">DelimitedInputFormat</span><span class="o">(</span><span class="n">parseLineItem</span><span class="o">))</span>
+
+<span class="k">val</span> <span class="n">filteredOrders</span> <span class="k">=</span> <span class="n">orders</span> <span class="n">filter</span> <span class="o">{</span> <span class="n">o</span> <span class="k">=&gt;</span> <span class="n">o</span><span class="o">.</span><span class="n">status</span> <span class="o">==</span> <span class="s">&quot;F&quot;</span> <span class="o">&amp;&amp;</span> <span class="n">o</span><span class="o">.</span><span class="n">date</span><span class="o">.</span><span class="n">substring</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="mi">4</span><span class="o">).</span><span class="n">toInt</span> <span class="o">&gt;</span> <span class="mi">1993</span> <span class="o">&amp;&amp;</span> <span class="n">o</span><span class="o">.</span><span class="n">orderPriority</span><span class="o">.</span><span class="n">startsWith</span><span class="o">(</span><span class="s">&quot;5&quot;</span><span class="o">)
 </span> <span class="o">}</span>
+
+<span class="k">val</span> <span class="n">prioritizedItems</span> <span class="k">=</span> <span class="n">filteredOrders</span> <span class="n">join</span> <span class="n">lineItems</span>
+    <span class="n">where</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">orderId</span> <span class="o">}</span> <span class="n">isEqualTo</span> <span class="o">{</span> <span class="k">_</span><span class="o">.</span><span class="n">orderId</span> <span class="o">}</span> <span class="c1">// join on the orderIds</span>
+    <span class="n">map</span> <span class="o">{</span> <span class="o">(</span><span class="n">o</span><span class="o">,</span> <span class="n">li</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nc">PrioritizedOrder</span><span class="o">(</span><span class="n">o</span><span class="o">.</span><span class="n">orderId</span><span class="o">,</span> <span class="n">o</span><span class="o">.</span><span class="n">shipPriority</span><span class="o">,</span> <span class="n">li</span><span class="o">.</span><span class="n">extendedPrice</span><span class="o">)</span> <span class="o">}</span>
+
+<span class="k">val</span> <span class="n">prioritizedOrders</span> <span class="k">=</span> <span class="n">prioritizedItems</span>
+    <span class="n">groupBy</span> <span class="o">{</span> <span class="n">pi</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">pi</span><span class="o">.</span><span class="n">orderId</span><span class="o">,</span> <span class="n">pi</span><span class="o">.</span><span class="n">shipPriority</span><span class="o">)</span> <span class="o">}</span> 
+    <span class="n">reduce</span> <span class="o">{</span> <span class="o">(</span><span class="n">po1</span><span class="o">,</span> <span class="n">po2</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="n">po1</span><span class="o">.</span><span class="n">copy</span><span class="o">(</span><span class="n">revenue</span> <span class="k">=</span> <span class="n">po1</span><span class="o">.</span><span class="n">revenue</span> <span class="o">+</span> <span class="n">po2</span><span class="o">.</span><span class="n">revenue</span><span class="o">)</span> <span class="o">}</span>
+
+<span class="k">val</span> <span class="n">output</span> <span class="k">=</span> <span class="n">prioritizedOrders</span><span class="o">.</span><span class="n">write</span><span class="o">(</span><span class="n">ordersOutput</span><span class="o">,</span> <span class="nc">CsvOutputFormat</span><span class="o">(</span><span class="n">formatOutput</span><span class="o">))</span>
+</code></pre></div>
+<p>The <a href=https://github.com/apache/incubator-flink/blob/master//stratosphere-examples/stratosphere-scala-examples/src/main/scala/eu/stratosphere/examples/scala/relational/RelationalQuery.scala>Relational Query program</a> implements the above query. It requires the following parameters to run: <code>&lt;orders input path&gt;, &lt;lineitem input path&gt;, &lt;output path&gt;, &lt;degree of parallelism&gt;</code>.</p>
+
+<p>The orders and lineitem files can be generated using the <a href="http://www.tpc.org/tpch/">TPC-H benchmark</a> suite&#39;s data generator tool (DBGEN). 
+Take the following steps to generate arbitrary large input files for the provided Stratosphere programs:</p>
+
+<ol>
+<li> Download and unpack DBGEN</li>
+<li> Make a copy of <em>makefile.suite</em> called <em>Makefile</em> and perform the following changes:</li>
+</ol>
+<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">DATABASE</span> <span class="o">=</span> DB2
+<span class="nv">MACHINE</span>  <span class="o">=</span> LINUX
+<span class="nv">WORKLOAD</span> <span class="o">=</span> TPCH
+<span class="nv">CC</span>       <span class="o">=</span> gcc
+</code></pre></div>
+<ol>
+<li> Build DBGEN using <em>make</em></li>
+<li> Generate lineitem and orders relations using dbgen. A scale factor
+(-s) of 1 results in a generated data set with about 1 GB size.</li>
+</ol>
+<div class="highlight"><pre><code class="language-bash" data-lang="bash">./dbgen -T o -s 1
+</code></pre></div>
+
+      <div style="padding-top:30px" id="disqus_thread"></div>
+<script type="text/javascript">
+    /* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */
+    var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname
+
+    /* * * DON'T EDIT BELOW THIS LINE * * */
+    (function() {
+        var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
+        dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
+        (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
+    })();
+</script>
+<noscript>Please enable JavaScript to view the <a href="http://disqus.com/?ref_noscript">comments powered by Disqus.</a></noscript>
+<a href="http://disqus.com" class="dsq-brlink">comments powered by <span class="logo-disqus">Disqus</span></a>
+
+  </div>
+</div>
+
+     <div class="footer">
+
+<hr class="divider">
+
+<p><small>Apache Flink is an effort undergoing incubation at The Apache Software
+Foundation (ASF), sponsored by the Apache Incubator PMC. Incubation is
+required of all newly accepted projects until a further review indicates that
+the infrastructure, communications, and decision making process have
+stabilized in a manner consistent with other successful ASF projects. While
+incubation status is not necessarily a reflection of the completeness or
+stability of the code, it does indicate that the project has yet to be fully
+endorsed by the ASF.</small></p>
+
+<p><a href="http://incubator.apache.org/"><img src="/img/apache-incubator-logo.png" alt="Incubator Logo"></a></p>
+
+<p class="text-center"><a href="/privacy-policy.html">Privacy Policy<a></p>
+
+      </div>
+    </div>
+
+    <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.0/jquery.min.js"></script>
+    <script src="/js/bootstrap.min.js"></script>
+
+    <script>
+      (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
+      (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
+      m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
+      })(window,document,'script','//www.google-analytics.com/analytics.js','ga');
+
+      ga('create', 'UA-52545728-1', 'auto');
+      ga('send', 'pageview');
+
+    </script>
+
+  </body>
+</html>