You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/18 14:00:20 UTC

[20/84] [abbrv] flink-web git commit: Updated Flink site

http://git-wip-us.apache.org/repos/asf/flink-web/blob/d8883b04/content/news/2015/02/09/streaming-example.html
----------------------------------------------------------------------
diff --git a/content/news/2015/02/09/streaming-example.html b/content/news/2015/02/09/streaming-example.html
deleted file mode 100644
index 198d0d7..0000000
--- a/content/news/2015/02/09/streaming-example.html
+++ /dev/null
@@ -1,846 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-  <head>
-    <meta charset="utf-8">
-    <meta http-equiv="X-UA-Compatible" content="IE=edge">
-    <meta name="viewport" content="width=device-width, initial-scale=1">
-    <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
-    <title>Apache Flink: Introducing Flink Streaming</title>
-    <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
-    <link rel="icon" href="/favicon.ico" type="image/x-icon">
-
-    <!-- Bootstrap -->
-    <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
-    <link rel="stylesheet" href="/css/flink.css">
-    <link rel="stylesheet" href="/css/syntax.css">
-
-    <!-- Blog RSS feed -->
-    <link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Apache Flink Blog: RSS feed" />
-
-    <!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
-    <!-- We need to load Jquery in the header for custom google analytics event tracking-->
-    <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
-
-    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
-    <!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
-    <!--[if lt IE 9]>
-      <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
-      <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
-    <![endif]-->
-  </head>
-  <body>  
-    
-
-  <!-- Top navbar. -->
-    <nav class="navbar navbar-default navbar-fixed-top">
-      <div class="container">
-        <!-- The logo. -->
-        <div class="navbar-header">
-          <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
-            <span class="icon-bar"></span>
-            <span class="icon-bar"></span>
-            <span class="icon-bar"></span>
-          </button>
-          <div class="navbar-logo">
-            <a href="/">
-              <img alt="Apache Flink" src="/img/navbar-brand-logo.jpg" width="78px" height="40px">
-            </a>
-          </div>
-        </div><!-- /.navbar-header -->
-
-        <!-- The navigation links. -->
-        <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
-          <ul class="nav navbar-nav">
-
-            <!-- Overview -->
-            <li><a href="/index.html">Overview</a></li>
-
-            <!-- Features -->
-            <li><a href="/features.html">Features</a></li>
-
-            <!-- Downloads -->
-            <li><a href="/downloads.html">Downloads</a></li>
-
-            <!-- FAQ -->
-            <li><a href="/faq.html">FAQ</a></li>
-
-
-            <!-- Quickstart -->
-            <li class="dropdown">
-              <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false"><small><span class="glyphicon glyphicon-new-window"></span></small> Quickstart <span class="caret"></span></a>
-              <ul class="dropdown-menu" role="menu">
-                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1/quickstart/setup_quickstart.html">Setup</a></li>
-                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1/quickstart/run_example_quickstart.html">Example: Wikipedia Edit Stream</a></li>
-                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1/quickstart/java_api_quickstart.html">Java API</a></li>
-                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1/quickstart/scala_api_quickstart.html">Scala API</a></li>
-              </ul>
-            </li>
-
-            <!-- Documentation -->
-            <li class="dropdown">
-              <a href="" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false"><small><span class="glyphicon glyphicon-new-window"></span></small> Documentation <span class="caret"></span></a>
-              <ul class="dropdown-menu" role="menu">
-                <!-- Latest stable release -->
-                <li role="presentation" class="dropdown-header"><strong>Latest Release</strong> (Stable)</li>
-                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1">1.1 Documentation</a></li>
-                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java" class="active">1.1 Javadocs</a></li>
-                <!--<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1/api/scala/index.html" class="active">1.1 ScalaDocs</a></li> -->
-
-                <!-- Snapshot docs -->
-                <li class="divider"></li>
-                <li role="presentation" class="dropdown-header"><strong>Snapshot</strong> (Development)</li>
-                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.2">1.2 Documentation</a></li>
-                <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.2/api/java" class="active">1.2 Javadocs</a></li>
-                <!--<li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.2/api/scala/index.html" class="active">1.2 ScalaDocs</a></li> -->
-
-                <!-- Wiki -->
-                <li class="divider"></li>
-                <li><a href="/visualizer/"><small><span class="glyphicon glyphicon-new-window"></span></small> Plan Visualizer</a></li>
-                <li><a href="https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home"><small><span class="glyphicon glyphicon-new-window"></span></small> Wiki</a></li>
-              </ul>
-            </li>
-
-          </ul>
-
-          <ul class="nav navbar-nav navbar-right">
-            <!-- Blog -->
-            <li class=" active hidden-md hidden-sm"><a href="/blog/">Blog</a></li>
-
-            <li class="dropdown hidden-md hidden-sm">
-              <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Community <span class="caret"></span></a>
-              <ul class="dropdown-menu" role="menu">
-                <!-- Community -->
-                <li role="presentation" class="dropdown-header"><strong>Community</strong></li>
-                <li><a href="/community.html#mailing-lists">Mailing Lists</a></li>
-                <li><a href="/community.html#irc">IRC</a></li>
-                <li><a href="/community.html#stack-overflow">Stack Overflow</a></li>
-                <li><a href="/community.html#issue-tracker">Issue Tracker</a></li>
-                <li><a href="/community.html#third-party-packages">Third Party Packages</a></li>
-                <li><a href="/community.html#source-code">Source Code</a></li>
-                <li><a href="/community.html#people">People</a></li>
-                <li><a href="/poweredby.html">Powered by Flink</a></li>
-
-                <!-- Contribute -->
-                <li class="divider"></li>
-                <li role="presentation" class="dropdown-header"><strong>Contribute</strong></li>
-                <li><a href="/how-to-contribute.html">How to Contribute</a></li>
-                <li><a href="/contribute-code.html">Contribute Code</a></li>
-                <li><a href="/contribute-documentation.html">Contribute Documentation</a></li>
-                <li><a href="/improve-website.html">Improve the Website</a></li>
-                <li><a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals"><small><span class="glyphicon glyphicon-new-window"></span></small> Flink Improvement Proposals (Design Docs)</a></li>
-              </ul>
-            </li>
-
-            <li class="dropdown hidden-md hidden-sm">
-              <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-expanded="false">Project <span class="caret"></span></a>
-              <ul class="dropdown-menu" role="menu">
-                <!-- Project -->
-                <li role="presentation" class="dropdown-header"><strong>Project</strong></li>
-                <li><a href="/slides.html">Slides</a></li>
-                <li><a href="/material.html">Material</a></li>
-                <li><a href="https://twitter.com/apacheflink"><small><span class="glyphicon glyphicon-new-window"></span></small> Twitter</a></li>
-                <li><a href="https://github.com/apache/flink"><small><span class="glyphicon glyphicon-new-window"></span></small> GitHub</a></li>
-                <li><a href="https://cwiki.apache.org/confluence/display/FLINK/Apache+Flink+Home"><small><span class="glyphicon glyphicon-new-window"></span></small> Wiki</a></li>
-              </ul>
-            </li>
-          </ul>
-        </div><!-- /.navbar-collapse -->
-      </div><!-- /.container -->
-    </nav>
-
-
-    <!-- Main content. -->
-    <div class="container">
-      
-
-<div class="row">
-  <div class="col-sm-8 col-sm-offset-2">
-    <div class="row">
-      <h1>Introducing Flink Streaming</h1>
-
-      <article>
-        <p>09 Feb 2015</p>
-
-<p>This post is the first of a series of blog posts on Flink Streaming,
-the recent addition to Apache Flink that makes it possible to analyze
-continuous data sources in addition to static files. Flink Streaming
-uses the pipelined Flink engine to process data streams in real time
-and offers a new API including definition of flexible windows.</p>
-
-<p>In this post, we go through an example that uses the Flink Streaming
-API to compute statistics on stock market data that arrive
-continuously and combine the stock market data with Twitter streams.
-See the <a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html">Streaming Programming
-Guide</a> for a
-detailed presentation of the Streaming API.</p>
-
-<p>First, we read a bunch of stock price streams and combine them into
-one stream of market data. We apply several transformations on this
-market data stream, like rolling aggregations per stock. Then we emit
-price warning alerts when the prices are rapidly changing. Moving 
-towards more advanced features, we compute rolling correlations
-between the market data streams and a Twitter stream with stock mentions.</p>
-
-<p>For running the example implementation please use the <em>0.9-SNAPSHOT</em> 
-version of Flink as a dependency. The full example code base can be 
-found <a href="https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala">here</a> in Scala and <a href="https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java">here</a> in Java7.</p>
-
-<p><a href="#top"></a></p>
-
-<p><a href="#top">Back to top</a></p>
-
-<h2 id="reading-from-multiple-inputs">Reading from multiple inputs</h2>
-
-<p>First, let us create the stream of stock prices:</p>
-
-<ol>
-  <li>Read a socket stream of stock prices</li>
-  <li>Parse the text in the stream to create a stream of <code>StockPrice</code> objects</li>
-  <li>Add four other sources tagged with the stock symbol.</li>
-  <li>Finally, merge the streams to create a unified stream.</li>
-</ol>
-
-<p><img alt="Reading from multiple inputs" src="/img/blog/blog_multi_input.png" width="70%" class="img-responsive center-block" /></p>
-
-<div class="codetabs">
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">main</span><span class="o">(</span><span class="n">args</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="o">{</span>
-
-  <span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">StreamExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
-
-  <span class="c1">//Read from a socket stream at map it to StockPrice objects</span>
-  <span class="k">val</span> <span class="n">socketStockStream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">socketTextStream</span><span class="o">(</span><span class="s">&quot;localhost&quot;</span><span class="o">,</span> <span class="mi">9999</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=&gt;</span> <span class="o">{</span>
-    <span class="k">val</span> <span class="n">split</span> <span class="k">=</span> <span class="n">x</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot;,&quot;</span><span class="o">)</span>
-    <span class="nc">StockPrice</span><span class="o">(</span><span class="n">split</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">split</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">toDouble</span><span class="o">)</span>
-  <span class="o">})</span>
-
-  <span class="c1">//Generate other stock streams</span>
-  <span class="k">val</span> <span class="nc">SPX_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">&quot;SPX&quot;</span><span class="o">)(</span><span class="mi">10</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span>
-  <span class="k">val</span> <span class="nc">FTSE_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">&quot;FTSE&quot;</span><span class="o">)(</span><span class="mi">20</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span>
-  <span class="k">val</span> <span class="nc">DJI_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">&quot;DJI&quot;</span><span class="o">)(</span><span class="mi">30</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span>
-  <span class="k">val</span> <span class="nc">BUX_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">&quot;BUX&quot;</span><span class="o">)(</span><span class="mi">40</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span>
-
-  <span class="c1">//Merge all stock streams together</span>
-  <span class="k">val</span> <span class="n">stockStream</span> <span class="k">=</span> <span class="n">socketStockStream</span><span class="o">.</span><span class="n">merge</span><span class="o">(</span><span class="nc">SPX_Stream</span><span class="o">,</span> <span class="nc">FTSE_Stream</span><span class="o">,</span> 
-    <span class="nc">DJI_Stream</span><span class="o">,</span> <span class="nc">BUX_Stream</span><span class="o">)</span>
-
-  <span class="n">stockStream</span><span class="o">.</span><span class="n">print</span><span class="o">()</span>
-
-  <span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="o">(</span><span class="s">&quot;Stock stream&quot;</span><span class="o">)</span>
-<span class="o">}</span></code></pre></div>
-
-  </div>
-  <div data-lang="java7">
-
-    <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
-
-    <span class="kd">final</span> <span class="n">StreamExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span>
-        <span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
-
-    <span class="c1">//Read from a socket stream at map it to StockPrice objects</span>
-    <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">socketStockStream</span> <span class="o">=</span> <span class="n">env</span>
-            <span class="o">.</span><span class="na">socketTextStream</span><span class="o">(</span><span class="s">&quot;localhost&quot;</span><span class="o">,</span> <span class="mi">9999</span><span class="o">)</span>
-            <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">StockPrice</span><span class="o">&gt;()</span> <span class="o">{</span>
-                <span class="kd">private</span> <span class="n">String</span><span class="o">[]</span> <span class="n">tokens</span><span class="o">;</span>
-
-                <span class="nd">@Override</span>
-                <span class="kd">public</span> <span class="n">StockPrice</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
-                    <span class="n">tokens</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;,&quot;</span><span class="o">);</span>
-                    <span class="k">return</span> <span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">tokens</span><span class="o">[</span><span class="mi">0</span><span class="o">],</span>
-                        <span class="n">Double</span><span class="o">.</span><span class="na">parseDouble</span><span class="o">(</span><span class="n">tokens</span><span class="o">[</span><span class="mi">1</span><span class="o">]));</span>
-                <span class="o">}</span>
-            <span class="o">});</span>
-
-    <span class="c1">//Generate other stock streams</span>
-    <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">SPX_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">&quot;SPX&quot;</span><span class="o">,</span> <span class="mi">10</span><span class="o">));</span>
-    <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">FTSE_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">&quot;FTSE&quot;</span><span class="o">,</span> <span class="mi">20</span><span class="o">));</span>
-    <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">DJI_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">&quot;DJI&quot;</span><span class="o">,</span> <span class="mi">30</span><span class="o">));</span>
-    <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">BUX_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">&quot;BUX&quot;</span><span class="o">,</span> <span class="mi">40</span><span class="o">));</span>
-
-    <span class="c1">//Merge all stock streams together</span>
-    <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">stockStream</span> <span class="o">=</span> <span class="n">socketStockStream</span>
-        <span class="o">.</span><span class="na">merge</span><span class="o">(</span><span class="n">SPX_stream</span><span class="o">,</span> <span class="n">FTSE_stream</span><span class="o">,</span> <span class="n">DJI_stream</span><span class="o">,</span> <span class="n">BUX_stream</span><span class="o">);</span>
-
-    <span class="n">stockStream</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
-
-    <span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;Stock stream&quot;</span><span class="o">);</span></code></pre></div>
-
-  </div>
-</div>
-
-<p>See
-<a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#data-sources">here</a>
-on how you can create streaming sources for Flink Streaming
-programs. Flink, of course, has support for reading in streams from
-<a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html">external
-sources</a>
-such as Apache Kafka, Apache Flume, RabbitMQ, and others. For the sake
-of this example, the data streams are simply generated using the
-<code>generateStock</code> method:</p>
-
-<div class="codetabs">
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">symbols</span> <span class="k">=</span> <span class="nc">List</span><span class="o">(</span><span class="s">&quot;SPX&quot;</span><span class="o">,</span> <span class="s">&quot;FTSE&quot;</span><span class="o">,</span> <span class="s">&quot;DJI&quot;</span><span class="o">,</span> <span class="s">&quot;DJT&quot;</span><span class="o">,</span> <span class="s">&quot;BUX&quot;</span><span class="o">,</span> <span class="s">&quot;DAX&quot;</span><span class="o">,</span> <span class="s">&quot;GOOG&quot;</span><span class="o">)</span>
-
-<span class="k">case</span> <span class="k">class</span> <span class="nc">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">price</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span>
-
-<span class="k">def</span> <span class="n">generateStock</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">)(</span><span class="n">sigma</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)(</span><span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span>
-  <span class="k">var</span> <span class="n">price</span> <span class="k">=</span> <span class="mf">1000.</span>
-  <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span>
-    <span class="n">price</span> <span class="k">=</span> <span class="n">price</span> <span class="o">+</span> <span class="nc">Random</span><span class="o">.</span><span class="n">nextGaussian</span> <span class="o">*</span> <span class="n">sigma</span>
-    <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="nc">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">price</span><span class="o">))</span>
-    <span class="nc">Thread</span><span class="o">.</span><span class="n">sleep</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="mi">200</span><span class="o">))</span>
-  <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
-
-  </div>
-  <div data-lang="java7">
-
-    <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">ArrayList</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">SYMBOLS</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span>
-    <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">&quot;SPX&quot;</span><span class="o">,</span> <span class="s">&quot;FTSE&quot;</span><span class="o">,</span> <span class="s">&quot;DJI&quot;</span><span class="o">,</span> <span class="s">&quot;DJT&quot;</span><span class="o">,</span> <span class="s">&quot;BUX&quot;</span><span class="o">,</span> <span class="s">&quot;DAX&quot;</span><span class="o">,</span> <span class="s">&quot;GOOG&quot;</span><span class="o">));</span>
-
-<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">StockPrice</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span>
-
-    <span class="kd">public</span> <span class="n">String</span> <span class="n">symbol</span><span class="o">;</span>
-    <span class="kd">public</span> <span class="n">Double</span> <span class="n">price</span><span class="o">;</span>
-
-    <span class="kd">public</span> <span class="nf">StockPrice</span><span class="o">()</span> <span class="o">{</span>
-    <span class="o">}</span>
-
-    <span class="kd">public</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">String</span> <span class="n">symbol</span><span class="o">,</span> <span class="n">Double</span> <span class="n">price</span><span class="o">)</span> <span class="o">{</span>
-        <span class="k">this</span><span class="o">.</span><span class="na">symbol</span> <span class="o">=</span> <span class="n">symbol</span><span class="o">;</span>
-        <span class="k">this</span><span class="o">.</span><span class="na">price</span> <span class="o">=</span> <span class="n">price</span><span class="o">;</span>
-    <span class="o">}</span>
-
-    <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="n">String</span> <span class="nf">toString</span><span class="o">()</span> <span class="o">{</span>
-        <span class="k">return</span> <span class="s">&quot;StockPrice{&quot;</span> <span class="o">+</span>
-                <span class="s">&quot;symbol=&#39;&quot;</span> <span class="o">+</span> <span class="n">symbol</span> <span class="o">+</span> <span class="sc">&#39;\&#39;&#39;</span> <span class="o">+</span>
-                <span class="s">&quot;, count=&quot;</span> <span class="o">+</span> <span class="n">price</span> <span class="o">+</span>
-                <span class="sc">&#39;}&#39;</span><span class="o">;</span>
-    <span class="o">}</span>
-<span class="o">}</span>
-
-<span class="kd">public</span> <span class="kd">final</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">StockSource</span> <span class="kd">implements</span> <span class="n">SourceFunction</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="o">{</span>
-
-    <span class="kd">private</span> <span class="n">Double</span> <span class="n">price</span><span class="o">;</span>
-    <span class="kd">private</span> <span class="n">String</span> <span class="n">symbol</span><span class="o">;</span>
-    <span class="kd">private</span> <span class="n">Integer</span> <span class="n">sigma</span><span class="o">;</span>
-
-    <span class="kd">public</span> <span class="nf">StockSource</span><span class="o">(</span><span class="n">String</span> <span class="n">symbol</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">sigma</span><span class="o">)</span> <span class="o">{</span>
-        <span class="k">this</span><span class="o">.</span><span class="na">symbol</span> <span class="o">=</span> <span class="n">symbol</span><span class="o">;</span>
-        <span class="k">this</span><span class="o">.</span><span class="na">sigma</span> <span class="o">=</span> <span class="n">sigma</span><span class="o">;</span>
-    <span class="o">}</span>
-
-    <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">invoke</span><span class="o">(</span><span class="n">Collector</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">collector</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
-        <span class="n">price</span> <span class="o">=</span> <span class="n">DEFAULT_PRICE</span><span class="o">;</span>
-        <span class="n">Random</span> <span class="n">random</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Random</span><span class="o">();</span>
-
-        <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span>
-            <span class="n">price</span> <span class="o">=</span> <span class="n">price</span> <span class="o">+</span> <span class="n">random</span><span class="o">.</span><span class="na">nextGaussian</span><span class="o">()</span> <span class="o">*</span> <span class="n">sigma</span><span class="o">;</span>
-            <span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">price</span><span class="o">));</span>
-            <span class="n">Thread</span><span class="o">.</span><span class="na">sleep</span><span class="o">(</span><span class="n">random</span><span class="o">.</span><span class="na">nextInt</span><span class="o">(</span><span class="mi">200</span><span class="o">));</span>
-        <span class="o">}</span>
-    <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
-
-  </div>
-</div>
-
-<p>To read from the text socket stream please make sure that you have a
-socket running. For the sake of the example executing the following
-command in a terminal does the job. You can get
-<a href="http://netcat.sourceforge.net/">netcat</a> here if it is not available
-on your machine.</p>
-
-<div class="highlight"><pre><code>nc -lk 9999
-</code></pre></div>
-
-<p>If we execute the program from our IDE we see the system the
-stock prices being generated:</p>
-
-<div class="highlight"><pre><code>INFO    Job execution switched to status RUNNING.
-INFO    Socket Stream(1/1) switched to SCHEDULED 
-INFO    Socket Stream(1/1) switched to DEPLOYING
-INFO    Custom Source(1/1) switched to SCHEDULED 
-INFO    Custom Source(1/1) switched to DEPLOYING
-\u2026
-1&gt; StockPrice{symbol='SPX', count=1011.3405732645239}
-2&gt; StockPrice{symbol='SPX', count=1018.3381290039248}
-1&gt; StockPrice{symbol='DJI', count=1036.7454894073978}
-3&gt; StockPrice{symbol='DJI', count=1135.1170217478427}
-3&gt; StockPrice{symbol='BUX', count=1053.667523187687}
-4&gt; StockPrice{symbol='BUX', count=1036.552601487263}
-</code></pre></div>
-
-<p><a href="#top">Back to top</a></p>
-
-<h2 id="window-aggregations">Window aggregations</h2>
-
-<p>We first compute aggregations on time-based windows of the
-data. Flink provides <a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html">flexible windowing semantics</a> where windows can
-also be defined based on count of records or any custom user defined
-logic.</p>
-
-<p>We partition our stream into windows of 10 seconds and slide the
-window every 5 seconds. We compute three statistics every 5 seconds.
-The first is the minimum price of all stocks, the second produces
-maximum price per stock, and the third is the mean stock price 
-(using a map window function). Aggregations and groupings can be
-performed on named fields of POJOs, making the code more readable.</p>
-
-<p><img alt="Basic windowing aggregations" src="/img/blog/blog_basic_window.png" width="70%" class="img-responsive center-block" /></p>
-
-<div class="codetabs">
-
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">//Define the desired time window</span>
-<span class="k">val</span> <span class="n">windowedStream</span> <span class="k">=</span> <span class="n">stockStream</span>
-  <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">)).</span><span class="n">every</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">5</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span>
-
-<span class="c1">//Compute some simple statistics on a rolling window</span>
-<span class="k">val</span> <span class="n">lowest</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">minBy</span><span class="o">(</span><span class="s">&quot;price&quot;</span><span class="o">)</span>
-<span class="k">val</span> <span class="n">maxByStock</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">).</span><span class="n">maxBy</span><span class="o">(</span><span class="s">&quot;price&quot;</span><span class="o">)</span>
-<span class="k">val</span> <span class="n">rollingMean</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">).</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">mean</span> <span class="k">_</span><span class="o">)</span>
-
-<span class="c1">//Compute the mean of a window</span>
-<span class="k">def</span> <span class="n">mean</span><span class="o">(</span><span class="n">ts</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span>
-  <span class="k">if</span> <span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="o">{</span>
-    <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="nc">StockPrice</span><span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">head</span><span class="o">.</span><span class="n">symbol</span><span class="o">,</span> <span class="n">ts</span><span class="o">.</span><span class="n">foldLeft</span><span class="o">(</span><span class="mi">0</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)(</span><span class="k">_</span> <span class="o">+</span> <span class="k">_</span><span class="o">.</span><span class="n">price</span><span class="o">)</span> <span class="o">/</span> <span class="n">ts</span><span class="o">.</span><span class="n">size</span><span class="o">))</span>
-  <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
-
-  </div>
-
-  <div data-lang="java7">
-
-    <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">//Define the desired time window</span>
-<span class="n">WindowedDataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">windowedStream</span> <span class="o">=</span> <span class="n">stockStream</span>
-    <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">))</span>
-    <span class="o">.</span><span class="na">every</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">5</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">));</span>
-
-<span class="c1">//Compute some simple statistics on a rolling window</span>
-<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">lowest</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">minBy</span><span class="o">(</span><span class="s">&quot;price&quot;</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span>
-<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">maxByStock</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">maxBy</span><span class="o">(</span><span class="s">&quot;price&quot;</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span>
-<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">rollingMean</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">WindowMean</span><span class="o">()).</span><span class="na">flatten</span><span class="o">();</span>
-
-<span class="c1">//Compute the mean of a window</span>
-<span class="kd">public</span> <span class="kd">final</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">WindowMean</span> <span class="kd">implements</span> 
-    <span class="n">WindowMapFunction</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">,</span> <span class="n">StockPrice</span><span class="o">&gt;</span> <span class="o">{</span>
-
-    <span class="kd">private</span> <span class="n">Double</span> <span class="n">sum</span> <span class="o">=</span> <span class="mf">0.0</span><span class="o">;</span>
-    <span class="kd">private</span> <span class="n">Integer</span> <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
-    <span class="kd">private</span> <span class="n">String</span> <span class="n">symbol</span> <span class="o">=</span> <span class="s">&quot;&quot;</span><span class="o">;</span>
-
-    <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapWindow</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> 
-        <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
-
-        <span class="k">if</span> <span class="o">(</span><span class="n">values</span><span class="o">.</span><span class="na">iterator</span><span class="o">().</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span><span class="n">s</span>
-            <span class="nf">for</span> <span class="o">(</span><span class="n">StockPrice</span> <span class="n">sp</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span>
-                <span class="n">sum</span> <span class="o">+=</span> <span class="n">sp</span><span class="o">.</span><span class="na">price</span><span class="o">;</span>
-                <span class="n">symbol</span> <span class="o">=</span> <span class="n">sp</span><span class="o">.</span><span class="na">symbol</span><span class="o">;</span>
-                <span class="n">count</span><span class="o">++;</span>
-            <span class="o">}</span>
-            <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">sum</span> <span class="o">/</span> <span class="n">count</span><span class="o">));</span>
-        <span class="o">}</span>
-    <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
-
-  </div>
-
-</div>
-
-<p>Let us note that to print a windowed stream one has to flatten it first,
-thus getting rid of the windowing logic. For example execute 
-<code>maxByStock.flatten().print()</code> to print the stream of maximum prices of
- the time windows by stock. For Scala <code>flatten()</code> is called implicitly
-when needed.</p>
-
-<p><a href="#top">Back to top</a></p>
-
-<h2 id="data-driven-windows">Data-driven windows</h2>
-
-<p>The most interesting event in the stream is when the price of a stock
-is changing rapidly. We can send a warning when a stock price changes
-more than 5% since the last warning. To do that, we use a delta-based window providing a
-threshold on when the computation will be triggered, a function to
-compute the difference and a default value with which the first record
-is compared. We also create a <code>Count</code> data type to count the warnings
-every 30 seconds.</p>
-
-<p><img alt="Data-driven windowing semantics" src="/img/blog/blog_data_driven.png" width="100%" class="img-responsive center-block" /></p>
-
-<div class="codetabs">
-
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">case</span> <span class="k">class</span> <span class="nc">Count</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span>
-<span class="k">val</span> <span class="n">defaultPrice</span> <span class="k">=</span> <span class="nc">StockPrice</span><span class="o">(</span><span class="s">&quot;&quot;</span><span class="o">,</span> <span class="mi">1000</span><span class="o">)</span>
-
-<span class="c1">//Use delta policy to create price change warnings</span>
-<span class="k">val</span> <span class="n">priceWarnings</span> <span class="k">=</span> <span class="n">stockStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
-  <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Delta</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mf">0.05</span><span class="o">,</span> <span class="n">priceChange</span><span class="o">,</span> <span class="n">defaultPrice</span><span class="o">))</span>
-  <span class="o">.</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">sendWarning</span> <span class="k">_</span><span class="o">)</span>
-
-<span class="c1">//Count the number of warnings every half a minute</span>
-<span class="k">val</span> <span class="n">warningsPerStock</span> <span class="k">=</span> <span class="n">priceWarnings</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="nc">Count</span><span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span>
-  <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
-  <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span>
-  <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="s">&quot;count&quot;</span><span class="o">)</span>
-
-<span class="k">def</span> <span class="n">priceChange</span><span class="o">(</span><span class="n">p1</span><span class="k">:</span> <span class="kt">StockPrice</span><span class="o">,</span> <span class="n">p2</span><span class="k">:</span> <span class="kt">StockPrice</span><span class="o">)</span><span class="k">:</span> <span class="kt">Double</span> <span class="o">=</span> <span class="o">{</span>
-  <span class="nc">Math</span><span class="o">.</span><span class="n">abs</span><span class="o">(</span><span class="n">p1</span><span class="o">.</span><span class="n">price</span> <span class="o">/</span> <span class="n">p2</span><span class="o">.</span><span class="n">price</span> <span class="o">-</span> <span class="mi">1</span><span class="o">)</span>
-<span class="o">}</span>
-
-<span class="k">def</span> <span class="n">sendWarning</span><span class="o">(</span><span class="n">ts</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span>
-  <span class="k">if</span> <span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">head</span><span class="o">.</span><span class="n">symbol</span><span class="o">)</span>
-<span class="o">}</span></code></pre></div>
-
-  </div>
-
-  <div data-lang="java7">
-
-    <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">Double</span> <span class="n">DEFAULT_PRICE</span> <span class="o">=</span> <span class="mi">1000</span><span class="o">.;</span>
-<span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">StockPrice</span> <span class="n">DEFAULT_STOCK_PRICE</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="s">&quot;&quot;</span><span class="o">,</span> <span class="n">DEFAULT_PRICE</span><span class="o">);</span>
-
-<span class="c1">//Use delta policy to create price change warnings</span>
-<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">priceWarnings</span> <span class="o">=</span> <span class="n">stockStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Delta</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mf">0.05</span><span class="o">,</span> <span class="k">new</span> <span class="n">DeltaFunction</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;()</span> <span class="o">{</span>
-        <span class="nd">@Override</span>
-        <span class="kd">public</span> <span class="kt">double</span> <span class="nf">getDelta</span><span class="o">(</span><span class="n">StockPrice</span> <span class="n">oldDataPoint</span><span class="o">,</span> <span class="n">StockPrice</span> <span class="n">newDataPoint</span><span class="o">)</span> <span class="o">{</span>
-            <span class="k">return</span> <span class="n">Math</span><span class="o">.</span><span class="na">abs</span><span class="o">(</span><span class="n">oldDataPoint</span><span class="o">.</span><span class="na">price</span> <span class="o">-</span> <span class="n">newDataPoint</span><span class="o">.</span><span class="na">price</span><span class="o">);</span>
-        <span class="o">}</span>
-    <span class="o">},</span> <span class="n">DEFAULT_STOCK_PRICE</span><span class="o">))</span>
-<span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">SendWarning</span><span class="o">()).</span><span class="na">flatten</span><span class="o">();</span>
-
-<span class="c1">//Count the number of warnings every half a minute</span>
-<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Count</span><span class="o">&gt;</span> <span class="n">warningsPerStock</span> <span class="o">=</span> <span class="n">priceWarnings</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Count</span><span class="o">&gt;()</span> <span class="o">{</span>
-    <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="n">Count</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
-        <span class="k">return</span> <span class="k">new</span> <span class="nf">Count</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span>
-    <span class="o">}</span>
-<span class="o">}).</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">).</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)).</span><span class="na">sum</span><span class="o">(</span><span class="s">&quot;count&quot;</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span>
-
-<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Count</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span>
-    <span class="kd">public</span> <span class="n">String</span> <span class="n">symbol</span><span class="o">;</span>
-    <span class="kd">public</span> <span class="n">Integer</span> <span class="n">count</span><span class="o">;</span>
-
-    <span class="kd">public</span> <span class="nf">Count</span><span class="o">()</span> <span class="o">{</span>
-    <span class="o">}</span>
-
-    <span class="kd">public</span> <span class="nf">Count</span><span class="o">(</span><span class="n">String</span> <span class="n">symbol</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">count</span><span class="o">)</span> <span class="o">{</span>
-        <span class="k">this</span><span class="o">.</span><span class="na">symbol</span> <span class="o">=</span> <span class="n">symbol</span><span class="o">;</span>
-        <span class="k">this</span><span class="o">.</span><span class="na">count</span> <span class="o">=</span> <span class="n">count</span><span class="o">;</span>
-    <span class="o">}</span>
-
-    <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="n">String</span> <span class="nf">toString</span><span class="o">()</span> <span class="o">{</span>
-        <span class="k">return</span> <span class="s">&quot;Count{&quot;</span> <span class="o">+</span>
-                <span class="s">&quot;symbol=&#39;&quot;</span> <span class="o">+</span> <span class="n">symbol</span> <span class="o">+</span> <span class="sc">&#39;\&#39;&#39;</span> <span class="o">+</span>
-                <span class="s">&quot;, count=&quot;</span> <span class="o">+</span> <span class="n">count</span> <span class="o">+</span>
-                <span class="sc">&#39;}&#39;</span><span class="o">;</span>
-    <span class="o">}</span>
-<span class="o">}</span>
-
-<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">SendWarning</span> <span class="kd">implements</span> <span class="n">MapWindowFunction</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
-    <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapWindow</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> 
-        <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
-
-        <span class="k">if</span> <span class="o">(</span><span class="n">values</span><span class="o">.</span><span class="na">iterator</span><span class="o">().</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
-            <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">values</span><span class="o">.</span><span class="na">iterator</span><span class="o">().</span><span class="na">next</span><span class="o">().</span><span class="na">symbol</span><span class="o">);</span>
-        <span class="o">}</span>
-    <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
-
-  </div>
-
-</div>
-
-<p><a href="#top">Back to top</a></p>
-
-<h2 id="combining-with-a-twitter-stream">Combining with a Twitter stream</h2>
-
-<p>Next, we will read a Twitter stream and correlate it with our stock
-price stream. Flink has support for connecting to <a href="https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/twitter.html">Twitter\u2019s
-API</a>
-but for the sake of this example we generate dummy tweet data.</p>
-
-<p><img alt="Social media analytics" src="/img/blog/blog_social_media.png" width="100%" class="img-responsive center-block" /></p>
-
-<div class="codetabs">
-
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">//Read a stream of tweets</span>
-<span class="k">val</span> <span class="n">tweetStream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateTweets</span> <span class="k">_</span><span class="o">)</span>
-
-<span class="c1">//Extract the stock symbols</span>
-<span class="k">val</span> <span class="n">mentionedSymbols</span> <span class="k">=</span> <span class="n">tweetStream</span><span class="o">.</span><span class="n">flatMap</span><span class="o">(</span><span class="n">tweet</span> <span class="k">=&gt;</span> <span class="n">tweet</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">))</span>
-  <span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">toUpperCase</span><span class="o">())</span>
-  <span class="o">.</span><span class="n">filter</span><span class="o">(</span><span class="n">symbols</span><span class="o">.</span><span class="n">contains</span><span class="o">(</span><span class="k">_</span><span class="o">))</span>
-
-<span class="c1">//Count the extracted symbols</span>
-<span class="k">val</span> <span class="n">tweetsPerStock</span> <span class="k">=</span> <span class="n">mentionedSymbols</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="nc">Count</span><span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span>
-  <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
-  <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span>
-  <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="s">&quot;count&quot;</span><span class="o">)</span>
-
-<span class="k">def</span> <span class="n">generateTweets</span><span class="o">(</span><span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span>
-  <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span>
-    <span class="k">val</span> <span class="n">s</span> <span class="k">=</span> <span class="k">for</span> <span class="o">(</span><span class="n">i</span> <span class="k">&lt;-</span> <span class="mi">1</span> <span class="n">to</span> <span class="mi">3</span><span class="o">)</span> <span class="k">yield</span> <span class="o">(</span><span class="n">symbols</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="n">symbols</span><span class="o">.</span><span class="n">size</span><span class="o">)))</span>
-    <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">s</span><span class="o">.</span><span class="n">mkString</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">))</span>
-    <span class="nc">Thread</span><span class="o">.</span><span class="n">sleep</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="mi">500</span><span class="o">))</span>
-  <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
-
-  </div>
-
-  <div data-lang="java7">
-
-    <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">//Read a stream of tweets</span>
-<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">tweetStream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">TweetSource</span><span class="o">());</span>
-
-<span class="c1">//Extract the stock symbols</span>
-<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">mentionedSymbols</span> <span class="o">=</span> <span class="n">tweetStream</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span>
-    <span class="k">new</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
-    <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
-        <span class="n">String</span><span class="o">[]</span> <span class="n">words</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">);</span>
-        <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span>
-            <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">word</span><span class="o">.</span><span class="na">toUpperCase</span><span class="o">());</span>
-        <span class="o">}</span>
-    <span class="o">}</span>
-<span class="o">}).</span><span class="na">filter</span><span class="o">(</span><span class="k">new</span> <span class="n">FilterFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
-    <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
-        <span class="k">return</span> <span class="n">SYMBOLS</span><span class="o">.</span><span class="na">contains</span><span class="o">(</span><span class="n">value</span><span class="o">);</span>
-    <span class="o">}</span>
-<span class="o">});</span>
-
-<span class="c1">//Count the extracted symbols</span>
-<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Count</span><span class="o">&gt;</span> <span class="n">tweetsPerStock</span> <span class="o">=</span> <span class="n">mentionedSymbols</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Count</span><span class="o">&gt;()</span> <span class="o">{</span>
-    <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="n">Count</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
-        <span class="k">return</span> <span class="k">new</span> <span class="nf">Count</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span>
-    <span class="o">}</span>
-<span class="o">}).</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">).</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)).</span><span class="na">sum</span><span class="o">(</span><span class="s">&quot;count&quot;</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span>
-
-<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">TweetSource</span> <span class="kd">implements</span> <span class="n">SourceFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
-    <span class="n">Random</span> <span class="n">random</span><span class="o">;</span>
-    <span class="n">StringBuilder</span> <span class="n">stringBuilder</span><span class="o">;</span>
-
-    <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">invoke</span><span class="o">(</span><span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">collector</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
-        <span class="n">random</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Random</span><span class="o">();</span>
-        <span class="n">stringBuilder</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">StringBuilder</span><span class="o">();</span>
-
-        <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span>
-            <span class="n">stringBuilder</span><span class="o">.</span><span class="na">setLength</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span>
-            <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o">&lt;</span> <span class="mi">3</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span>
-                <span class="n">stringBuilder</span><span class="o">.</span><span class="na">append</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">);</span>
-                <span class="n">stringBuilder</span><span class="o">.</span><span class="na">append</span><span class="o">(</span><span class="n">SYMBOLS</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">random</span><span class="o">.</span><span class="na">nextInt</span><span class="o">(</span><span class="n">SYMBOLS</span><span class="o">.</span><span class="na">size</span><span class="o">())));</span>
-            <span class="o">}</span>
-            <span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">stringBuilder</span><span class="o">.</span><span class="na">toString</span><span class="o">());</span>
-            <span class="n">Thread</span><span class="o">.</span><span class="na">sleep</span><span class="o">(</span><span class="mi">500</span><span class="o">);</span>
-        <span class="o">}</span>
-
-    <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
-
-  </div>
-
-</div>
-
-<p><a href="#top">Back to top</a></p>
-
-<h2 id="streaming-joins">Streaming joins</h2>
-
-<p>Finally, we join real-time tweets and stock prices and compute a
-rolling correlation between the number of price warnings and the
-number of mentions of a given stock in the Twitter stream. As both of
-these data streams are potentially infinite, we apply the join on a
-30-second window.</p>
-
-<p><img alt="Streaming joins" src="/img/blog/blog_stream_join.png" width="60%" class="img-responsive center-block" /></p>
-
-<div class="codetabs">
-
-  <div data-lang="scala">
-
-    <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">//Join warnings and parsed tweets</span>
-<span class="k">val</span> <span class="n">tweetsAndWarning</span> <span class="k">=</span> <span class="n">warningsPerStock</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">tweetsPerStock</span><span class="o">)</span>
-  <span class="o">.</span><span class="n">onWindow</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">)</span>
-  <span class="o">.</span><span class="n">where</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
-  <span class="o">.</span><span class="n">equalTo</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span> <span class="o">{</span> <span class="o">(</span><span class="n">c1</span><span class="o">,</span> <span class="n">c2</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">c1</span><span class="o">.</span><span class="n">count</span><span class="o">,</span> <span class="n">c2</span><span class="o">.</span><span class="n">count</span><span class="o">)</span> <span class="o">}</span>
-
-<span class="k">val</span> <span class="n">rollingCorrelation</span> <span class="k">=</span> <span class="n">tweetsAndWarning</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span>
-  <span class="o">.</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">computeCorrelation</span> <span class="k">_</span><span class="o">)</span>
-
-<span class="n">rollingCorrelation</span> <span class="n">print</span>
-
-<span class="c1">//Compute rolling correlation</span>
-<span class="k">def</span> <span class="n">computeCorrelation</span><span class="o">(</span><span class="n">input</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">Double</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span>
-  <span class="k">if</span> <span class="o">(</span><span class="n">input</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="o">{</span>
-    <span class="k">val</span> <span class="n">var1</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">_1</span><span class="o">)</span>
-    <span class="k">val</span> <span class="n">mean1</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">)</span>
-    <span class="k">val</span> <span class="n">var2</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span>
-    <span class="k">val</span> <span class="n">mean2</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var2</span><span class="o">)</span>
-
-    <span class="k">val</span> <span class="n">cov</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">.</span><span class="n">zip</span><span class="o">(</span><span class="n">var2</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">xy</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">xy</span><span class="o">.</span><span class="n">_1</span> <span class="o">-</span> <span class="n">mean1</span><span class="o">)</span> <span class="o">*</span> <span class="o">(</span><span class="n">xy</span><span class="o">.</span><span class="n">_2</span> <span class="o">-</span> <span class="n">mean2</span><span class="o">)))</span>
-    <span class="k">val</span> <span class="n">d1</span> <span class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span class="n">sqrt</span><span class="o">(</span><span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=&gt;</span> <span class="nc">Math</span><span class="o">.</span><span class="n">pow</span><span class="o">((</span><span class="n">x</span> <span class="o">-</span> <span class="n">mean1</span><span class="o">),</span> <span class="mi">2</span><span class="o">))))</span>
-    <span class="k">val</span> <span class="n">d2</span> <span class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span class="n">sqrt</span><span class="o">(</span><span class="n">average</span><span class="o">(</span><span class="n">var2</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=&gt;</span> <span class="nc">Math</span><span class="o">.</span><span class="n">pow</span><span class="o">((</span><span class="n">x</span> <span class="o">-</span> <span class="n">mean2</span><span class="o">),</span> <span class="mi">2</span><span class="o">))))</span>
-
-    <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">cov</span> <span class="o">/</span> <span class="o">(</span><span class="n">d1</span> <span class="o">*</span> <span class="n">d2</span><span class="o">))</span>
-  <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
-
-  </div>
-
-  <div data-lang="java7">
-
-    <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">//Join warnings and parsed tweets</span>
-<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">tweetsAndWarning</span> <span class="o">=</span> <span class="n">warningsPerStock</span>
-    <span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">tweetsPerStock</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">onWindow</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">equalTo</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="k">new</span> <span class="n">JoinFunction</span><span class="o">&lt;</span><span class="n">Count</span><span class="o">,</span> <span class="n">Count</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;()</span> <span class="o">{</span>
-        <span class="nd">@Override</span>
-        <span class="kd">public</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="nf">join</span><span class="o">(</span><span class="n">Count</span> <span class="n">first</span><span class="o">,</span> <span class="n">Count</span> <span class="n">second</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
-            <span class="k">return</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="n">first</span><span class="o">.</span><span class="na">count</span><span class="o">,</span> <span class="n">second</span><span class="o">.</span><span class="na">count</span><span class="o">);</span>
-            <span class="o">}</span>
-    <span class="o">});</span>
-
-<span class="c1">//Compute rolling correlation</span>
-<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">rollingCorrelation</span> <span class="o">=</span> <span class="n">tweetsAndWarning</span>
-    <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">))</span>
-    <span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">WindowCorrelation</span><span class="o">());</span>
-
-<span class="n">rollingCorrelation</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
-
-<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">WindowCorrelation</span>
-    <span class="kd">implements</span> <span class="n">WindowMapFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="o">{</span>
-
-    <span class="kd">private</span> <span class="n">Integer</span> <span class="n">leftSum</span><span class="o">;</span>
-    <span class="kd">private</span> <span class="n">Integer</span> <span class="n">rightSum</span><span class="o">;</span>
-    <span class="kd">private</span> <span class="n">Integer</span> <span class="n">count</span><span class="o">;</span>
-
-    <span class="kd">private</span> <span class="n">Double</span> <span class="n">leftMean</span><span class="o">;</span>
-    <span class="kd">private</span> <span class="n">Double</span> <span class="n">rightMean</span><span class="o">;</span>
-
-    <span class="kd">private</span> <span class="n">Double</span> <span class="n">cov</span><span class="o">;</span>
-    <span class="kd">private</span> <span class="n">Double</span> <span class="n">leftSd</span><span class="o">;</span>
-    <span class="kd">private</span> <span class="n">Double</span> <span class="n">rightSd</span><span class="o">;</span>
-
-    <span class="nd">@Override</span>
-    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapWindow</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> 
-        <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
-
-        <span class="n">leftSum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
-        <span class="n">rightSum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
-        <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
-
-        <span class="n">cov</span> <span class="o">=</span> <span class="mi">0</span><span class="o">.;</span>
-        <span class="n">leftSd</span> <span class="o">=</span> <span class="mi">0</span><span class="o">.;</span>
-        <span class="n">rightSd</span> <span class="o">=</span> <span class="mi">0</span><span class="o">.;</span>
-
-        <span class="c1">//compute mean for both sides, save count</span>
-        <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">pair</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span>
-            <span class="n">leftSum</span> <span class="o">+=</span> <span class="n">pair</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span>
-            <span class="n">rightSum</span> <span class="o">+=</span> <span class="n">pair</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
-            <span class="n">count</span><span class="o">++;</span>
-        <span class="o">}</span>
-
-        <span class="n">leftMean</span> <span class="o">=</span> <span class="n">leftSum</span><span class="o">.</span><span class="na">doubleValue</span><span class="o">()</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span>
-        <span class="n">rightMean</span> <span class="o">=</span> <span class="n">rightSum</span><span class="o">.</span><span class="na">doubleValue</span><span class="o">()</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span>
-
-        <span class="c1">//compute covariance &amp; std. deviations</span>
-        <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">pair</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span>
-            <span class="n">cov</span> <span class="o">+=</span> <span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f0</span> <span class="o">-</span> <span class="n">leftMean</span><span class="o">)</span> <span class="o">*</span> <span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f1</span> <span class="o">-</span> <span class="n">rightMean</span><span class="o">)</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span>
-        <span class="o">}</span>
-
-        <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">pair</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span>
-            <span class="n">leftSd</span> <span class="o">+=</span> <span class="n">Math</span><span class="o">.</span><span class="na">pow</span><span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f0</span> <span class="o">-</span> <span class="n">leftMean</span><span class="o">,</span> <span class="mi">2</span><span class="o">)</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span>
-            <span class="n">rightSd</span> <span class="o">+=</span> <span class="n">Math</span><span class="o">.</span><span class="na">pow</span><span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f1</span> <span class="o">-</span> <span class="n">rightMean</span><span class="o">,</span> <span class="mi">2</span><span class="o">)</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span>
-        <span class="o">}</span>
-        <span class="n">leftSd</span> <span class="o">=</span> <span class="n">Math</span><span class="o">.</span><span class="na">sqrt</span><span class="o">(</span><span class="n">leftSd</span><span class="o">);</span>
-        <span class="n">rightSd</span> <span class="o">=</span> <span class="n">Math</span><span class="o">.</span><span class="na">sqrt</span><span class="o">(</span><span class="n">rightSd</span><span class="o">);</span>
-
-        <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">cov</span> <span class="o">/</span> <span class="o">(</span><span class="n">leftSd</span> <span class="o">*</span> <span class="n">rightSd</span><span class="o">));</span>
-    <span class="o">}</span>
-<span class="o">}</span></code></pre></div>
-
-  </div>
-
-</div>
-
-<p><a href="#top">Back to top</a></p>
-
-<h2 id="other-things-to-try">Other things to try</h2>
-
-<p>For a full feature overview please check the <a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html">Streaming Guide</a>, which describes all the available API features.
-You are very welcome to try out our features for different use-cases we are looking forward to your experiences. Feel free to <a href="http://flink.apache.org/community.html#mailing-lists">contact us</a>.</p>
-
-<h2 id="upcoming-for-streaming">Upcoming for streaming</h2>
-
-<p>There are some aspects of Flink Streaming that are subjects to
-change by the next release making this application look even nicer.</p>
-
-<p>Stay tuned for later blog posts on how Flink Streaming works
-internally, fault tolerance, and performance measurements!</p>
-
-<p><a href="#top">Back to top</a></p>
-
-      </article>
-    </div>
-
-    <div class="row">
-      <div id="disqus_thread"></div>
-      <script type="text/javascript">
-        /* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */
-        var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname
-
-        /* * * DON'T EDIT BELOW THIS LINE * * */
-        (function() {
-            var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
-            dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
-             (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
-        })();
-      

<TRUNCATED>