You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sa...@apache.org on 2016/05/13 22:27:57 UTC

[11/13] incubator-apex-site git commit: Adding apex-3.4 documentation

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/e4953041/docs/apex-3.4/application_development/index.html
----------------------------------------------------------------------
diff --git a/docs/apex-3.4/application_development/index.html b/docs/apex-3.4/application_development/index.html
new file mode 100644
index 0000000..8c8f184
--- /dev/null
+++ b/docs/apex-3.4/application_development/index.html
@@ -0,0 +1,2677 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>Applications - Apache Apex Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../favicon.ico">
+  
+
+  
+  <link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "Applications";
+    var mkdocs_page_input_path = "application_development.md";
+    var mkdocs_page_url = "/application_development/";
+  </script>
+  
+  <script src="../js/jquery-2.1.1.min.js"></script>
+  <script src="../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../js/highlight.pack.js"></script>
+  <script src="../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href=".." class="icon icon-home"> Apache Apex Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../search.html" method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="..">Apache Apex</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Development</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../apex_development_setup/">Development Setup</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">Applications</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a href="#application-developer-guide">Application Developer Guide</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#running-a-test-application">Running A Test Application</a></li>
+                
+                    <li><a class="toctree-l4" href="#test-application-yahoo-finance-quotes">Test Application: Yahoo! Finance Quotes</a></li>
+                
+                    <li><a class="toctree-l4" href="#running-a-test-application_1">Running a Test Application</a></li>
+                
+                    <li><a class="toctree-l4" href="#local-mode">Local Mode</a></li>
+                
+                    <li><a class="toctree-l4" href="#hadoop-cluster">Hadoop Cluster</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#apache-apex-platform-overview">Apache Apex Platform Overview</a></li>
+                
+                    <li><a class="toctree-l4" href="#streaming-computational-model">Streaming Computational Model</a></li>
+                
+                    <li><a class="toctree-l4" href="#streaming-application-manager-stram">Streaming Application Manager (STRAM)</a></li>
+                
+                    <li><a class="toctree-l4" href="#hadoop-components">Hadoop Components</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#developing-an-application">Developing An Application</a></li>
+                
+                    <li><a class="toctree-l4" href="#development-process">Development Process</a></li>
+                
+                    <li><a class="toctree-l4" href="#application-api">Application API</a></li>
+                
+                    <li><a class="toctree-l4" href="#json-file-dag-specification">JSON File DAG Specification</a></li>
+                
+                    <li><a class="toctree-l4" href="#operators">Operators</a></li>
+                
+                    <li><a class="toctree-l4" href="#streams">Streams</a></li>
+                
+                    <li><a class="toctree-l4" href="#affinity-rules">Affinity Rules</a></li>
+                
+                    <li><a class="toctree-l4" href="#validating-an-application">Validating an Application</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#multi-tenancy-and-security">Multi-Tenancy and Security</a></li>
+                
+                    <li><a class="toctree-l4" href="#security">Security</a></li>
+                
+                    <li><a class="toctree-l4" href="#resource-limits">Resource Limits</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#scalability-and-partitioning">Scalability and Partitioning</a></li>
+                
+                    <li><a class="toctree-l4" href="#partitioning">Partitioning</a></li>
+                
+                    <li><a class="toctree-l4" href="#nxm-partitions">NxM Partitions</a></li>
+                
+                    <li><a class="toctree-l4" href="#parallel">Parallel</a></li>
+                
+                    <li><a class="toctree-l4" href="#parallel-partitions-with-streams-modes">Parallel Partitions with Streams Modes</a></li>
+                
+                    <li><a class="toctree-l4" href="#skew-balancing-partition">Skew Balancing Partition</a></li>
+                
+                    <li><a class="toctree-l4" href="#skew-unifier-partition">Skew Unifier Partition</a></li>
+                
+                    <li><a class="toctree-l4" href="#cascading-unifier">Cascading Unifier</a></li>
+                
+                    <li><a class="toctree-l4" href="#sla">SLA</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#fault-tolerance">Fault Tolerance</a></li>
+                
+                    <li><a class="toctree-l4" href="#state-of-the-application">State of the Application</a></li>
+                
+                    <li><a class="toctree-l4" href="#checkpointing">Checkpointing</a></li>
+                
+                    <li><a class="toctree-l4" href="#recovery-mechanisms_1">Recovery Mechanisms</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#dynamic-application-modifications">Dynamic Application Modifications</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#demos">Demos</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../application_packages/">Packages</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../operator_development/">Operators</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../autometrics/">AutoMetric API</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operations</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../apex_cli/">Apex CLI</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../security/">Security</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../compatibility/">Compatibility</a>
+        
+    </li>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="..">Apache Apex Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Development &raquo;</li>
+        
+      
+    
+    <li>Applications</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="application-developer-guide">Application Developer Guide</h1>
+<p>The Apex platform is designed to process massive amounts of
+real-time events natively in Hadoop.  It runs as a YARN (Hadoop 2.x) 
+application and leverages Hadoop as a distributed operating
+system.  All the basic distributed operating system capabilities of
+Hadoop like resource management (YARN), distributed file system (HDFS),
+multi-tenancy, security, fault-tolerance, and scalability are supported natively 
+in all the Apex applications. �The platform handles all the details of the application 
+execution, including dynamic scaling, state checkpointing and recovery, event 
+processing guarantees, etc. allowing you to focus on writing your application logic without
+mixing operational and functional concerns.</p>
+<p>In the platform, building a streaming application can be extremely
+easy and intuitive. �The application is represented as a Directed
+Acyclic Graph (DAG) of computation units called <em>Operators</em> interconnected
+by the data-flow edges called  <em>Streams</em>.�The operators process input
+streams and produce output streams. A library of common operators is
+provided to enable quick application development. �In case the desired
+processing is not available in the Operator Library, one can easily
+write a custom operator. We refer those interested in creating their own
+operators to the <a href="../operator_development/">Operator Development Guide</a>.</p>
+<h1 id="running-a-test-application">Running A Test Application</h1>
+<p>If you are starting with the Apex platform for the first time,
+it can be informative to launch an existing application and see it run.
+One of the simplest examples provided in <a href="https://github.com/apache/incubator-apex-malhar">Apex-Malhar repository</a> is a Pi demo application,
+which computes the value of PI using random numbers.  After <a href="../apex_development_setup/">setting up development environment</a>
+Pi demo can be launched as follows:</p>
+<ol>
+<li>Open up Apex Malhar files in your IDE (for example Eclipse, IntelliJ, NetBeans, etc)</li>
+<li>Navigate to <code>demos/pi/src/test/java/com/datatorrent/demos/ApplicationTest.java</code></li>
+<li>Run the test for ApplicationTest.java</li>
+<li>View the output in system console</li>
+</ol>
+<p>Congratulations, you just ran your first real-time streaming demo :) 
+This demo is very simple and has four operators. The first operator
+emits random integers between 0 to 30, 000. The second operator receives
+these coefficients and emits a hashmap with x and y values each time it
+receives two values. The third operator takes these values and computes
+x**2+y**2. The last operator counts how many computed values from
+the previous operator were less than or equal to 30, 000**2. Assuming
+this count is N, then PI is computed as N/number of values received.
+Here is the code snippet for the PI application. This code populates the
+DAG. Do not worry about what each line does, we will cover these
+concepts later in this document.</p>
+<pre><code class="java">// Generates random numbers
+RandomEventGenerator rand = dag.addOperator(&quot;rand&quot;, new RandomEventGenerator());
+rand.setMinvalue(0);
+rand.setMaxvalue(30000);
+
+// Generates a round robin HashMap of &quot;x&quot; and &quot;y&quot;
+RoundRobinHashMap&lt;String,Object&gt; rrhm = dag.addOperator(&quot;rrhm&quot;, new RoundRobinHashMap&lt;String, Object&gt;());
+rrhm.setKeys(new String[] { &quot;x&quot;, &quot;y&quot; });
+
+// Calculates pi from x and y
+JavaScriptOperator calc = dag.addOperator(&quot;picalc&quot;, new Script());
+calc.setPassThru(false);
+calc.put(&quot;i&quot;,0);
+calc.put(&quot;count&quot;,0);
+calc.addSetupScript(&quot;function pi() { if (x*x+y*y &lt;= &quot;+maxValue*maxValue+&quot;) { i++; } count++; return i / count * 4; }&quot;);
+calc.setInvoke(&quot;pi&quot;);
+dag.addStream(&quot;rand_rrhm&quot;, rand.integer_data, rrhm.data);
+dag.addStream(&quot;rrhm_calc&quot;, rrhm.map, calc.inBindings);
+
+// puts results on system console
+ConsoleOutputOperator console = dag.addOperator(&quot;console&quot;, new ConsoleOutputOperator());
+dag.addStream(&quot;rand_console&quot;,calc.result, console.input);
+</code></pre>
+
+<p>You can review the other demos and see what they do. The examples
+given in the Demos project cover various features of the platform and we
+strongly encourage you to read these to familiarize yourself with the
+platform. In the remaining part of this document we will go through
+details needed for you to develop and run streaming applications in
+Malhar.</p>
+<h2 id="test-application-yahoo-finance-quotes">Test Application: Yahoo! Finance Quotes</h2>
+<p>The PI�application was to
+get you started. It is a basic application and does not fully illustrate
+the features of the platform. For the purpose of describing concepts, we
+will consider the test application shown in Figure 1. The application
+downloads tick data from  <a href="http://finance.yahoo.com">Yahoo! Finance</a> �and computes the
+following for four tickers, namely <a href="http://finance.yahoo.com/q?s=IBM">IBM</a>,
+<a href="http://finance.yahoo.com/q?s=GOOG">GOOG</a>, <a href="http://finance.yahoo.com/q?s=YHOO">YHOO</a>.</p>
+<ol>
+<li>Quote: Consisting of last trade price, last trade time, and
+    total volume for the day</li>
+<li>Per-minute chart data: Highest trade price, lowest trade
+    price, and volume during that minute</li>
+<li>Simple Moving Average: trade price over 5 minutes</li>
+</ol>
+<p>Total volume must ensure that all trade volume for that day is
+added, i.e. data loss would result in wrong results. Charting data needs
+all the trades in the same minute to go to the same slot, and then on it
+starts afresh, so again data loss would result in wrong results. The
+aggregation for charting data is done over 1 minute. Simple moving
+average computes the average price over a 5 minute sliding window; it
+too would produce wrong results if there is data loss. Figure 1 shows
+the application with no partitioning.</p>
+<p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image00.png" /></p>
+<p>The operator StockTickerInput:�StockTickerInput<a href="http://docs.google.com/../apidocs/com/datatorrent/demos/yahoofinance/StockTickInput.html">�</a>is
+the input operator that reads live data from Yahoo! Finance once per
+interval (user configurable in milliseconds), and emits the price, the
+incremental volume, and the last trade time of each stock symbol, thus
+emulating real ticks from the exchange. �We utilize the Yahoo! Finance
+CSV web service interface. �For example:</p>
+<pre><code>$ GET 'http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&amp;f=sl1vt1'
+&quot;IBM&quot;,203.966,1513041,&quot;1:43pm&quot;
+&quot;GOOG&quot;,762.68,1879741,&quot;1:43pm&quot;
+&quot;AAPL&quot;,444.3385,11738366,&quot;1:43pm&quot;
+&quot;YHOO&quot;,19.3681,14707163,&quot;1:43pm&quot;
+</code></pre>
+
+<p>Among all the operators in Figure 1, StockTickerInput is the only
+operator that requires extra code because it contains a custom mechanism
+to get the input data. �Other operators are used unchanged from the
+Malhar library.</p>
+<p>Here is the class implementation for StockTickInput:</p>
+<pre><code class="java">package com.datatorrent.demos.yahoofinance;
+
+import au.com.bytecode.opencsv.CSVReader;
+import com.datatorrent.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.*;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.cookie.CookiePolicy;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.params.DefaultHttpParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This operator sends price, volume and time into separate ports and calculates incremental volume.
+ */
+public class StockTickInput implements InputOperator
+{
+  private static final Logger logger = LoggerFactory.getLogger(StockTickInput.class);
+  /**
+   * Timeout interval for reading from server. 0 or negative indicates no timeout.
+   */
+  public int readIntervalMillis = 500;
+  /**
+   * The URL of the web service resource for the POST request.
+   */
+  private String url;
+  public String[] symbols;
+  private transient HttpClient client;
+  private transient GetMethod method;
+  private HashMap&lt;String, Long&gt; lastVolume = new HashMap&lt;String, Long&gt;();
+  private boolean outputEvenIfZeroVolume = false;
+  /**
+   * The output port to emit price.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort&lt;KeyValPair&lt;String, Double&gt;&gt; price = new DefaultOutputPort&lt;KeyValPair&lt;String, Double&gt;&gt;();
+  /**
+   * The output port to emit incremental volume.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort&lt;KeyValPair&lt;String, Long&gt;&gt; volume = new DefaultOutputPort&lt;KeyValPair&lt;String, Long&gt;&gt;();
+  /**
+   * The output port to emit last traded time.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort&lt;KeyValPair&lt;String, String&gt;&gt; time = new DefaultOutputPort&lt;KeyValPair&lt;String, String&gt;&gt;();
+
+  /**
+   * Prepare URL from symbols and parameters. URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&amp;f=sl1vt1
+   *
+   * @return the URL
+   */
+  private String prepareURL()
+  {
+    String str = &quot;http://download.finance.yahoo.com/d/quotes.csv?s=&quot;;
+    for (int i = 0; i &lt; symbols.length; i++) {
+      if (i != 0) {
+        str += &quot;,&quot;;
+      }
+      str += symbols[i];
+    }
+    str += &quot;&amp;f=sl1vt1&amp;e=.csv&quot;;
+    return str;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    url = prepareURL();
+    client = new HttpClient();
+    method = new GetMethod(url);
+    DefaultHttpParams.getDefaultParams().setParameter(&quot;http.protocol.cookie-policy&quot;, CookiePolicy.BROWSER_COMPATIBILITY);
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  @Override
+  public void emitTuples()
+  {
+
+    try {
+      int statusCode = client.executeMethod(method);
+      if (statusCode != HttpStatus.SC_OK) {
+        System.err.println(&quot;Method failed: &quot; + method.getStatusLine());
+      } else {
+        InputStream istream = method.getResponseBodyAsStream();
+        // Process response
+        InputStreamReader isr = new InputStreamReader(istream);
+        CSVReader reader = new CSVReader(isr);
+        List&lt;String[]&gt; myEntries = reader.readAll();
+        for (String[] stringArr: myEntries) {
+          ArrayList&lt;String&gt; tuple = new ArrayList&lt;String&gt;(Arrays.asList(stringArr));
+          if (tuple.size() != 4) {
+            return;
+          }
+          // input csv is &lt;Symbol&gt;,&lt;Price&gt;,&lt;Volume&gt;,&lt;Time&gt;
+          String symbol = tuple.get(0);
+          double currentPrice = Double.valueOf(tuple.get(1));
+          long currentVolume = Long.valueOf(tuple.get(2));
+          String timeStamp = tuple.get(3);
+          long vol = currentVolume;
+          // Sends total volume in first tick, and incremental volume afterwards.
+          if (lastVolume.containsKey(symbol)) {
+            vol -= lastVolume.get(symbol);
+          }
+
+          if (vol &gt; 0 || outputEvenIfZeroVolume) {
+            price.emit(new KeyValPair&lt;String, Double&gt;(symbol, currentPrice));
+            volume.emit(new KeyValPair&lt;String, Long&gt;(symbol, vol));
+            time.emit(new KeyValPair&lt;String, String&gt;(symbol, timeStamp));
+            lastVolume.put(symbol, currentVolume);
+          }
+        }
+      }
+      Thread.sleep(readIntervalMillis);
+    } catch (InterruptedException ex) {
+      logger.debug(ex.toString());
+    } catch (IOException ex) {
+      logger.debug(ex.toString());
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+
+  public void setOutputEvenIfZeroVolume(boolean outputEvenIfZeroVolume)
+  {
+    this.outputEvenIfZeroVolume = outputEvenIfZeroVolume;
+  }
+
+}
+</code></pre>
+
+<p>The operator has three output ports that emit the price of the
+stock, the volume of the stock and the last trade time of the stock,
+declared as public member variables price, volume�and  time�of the class. �The tuple of the
+price�output port is a key-value
+pair with the stock symbol being the key, and the price being the value.
+�The tuple of the volume�output
+port is a key value pair with the stock symbol being the key, and the
+incremental volume being the value. �The tuple of the  time�output port is a key value pair with the
+stock symbol being the key, and the last trade time being the
+value.</p>
+<p>Important: Since operators will be
+serialized, all input and output ports need to be declared transient
+because they are stateless and should not be serialized.</p>
+<p>The method�setup(OperatorContext)
+contains the code that is necessary for setting up the HTTP
+client for querying Yahoo! Finance.</p>
+<p>Method�emitTuples() contains
+the code that reads from Yahoo! Finance, and emits the data to the
+output ports of the operator. �emitTuples()�will be called one or more times
+within one application window as long as time is allowed within the
+window.</p>
+<p>Note that we want to emulate the tick input stream by having
+incremental volume data with Yahoo! Finance data. �We therefore subtract
+the previous volume from the current volume to emulate incremental
+volume for each tick.</p>
+<p>The operator
+DailyVolume:�This operator
+reads from the input port, which contains the incremental volume tuples
+from StockTickInput, and
+aggregates the data to provide the cumulative volume. �It uses the
+library class  SumKeyVal&lt;K,V&gt;�provided in math�package. �In this case,
+SumKeyVal&lt;String,Long&gt;, where K is the stock symbol, V is the
+aggregated volume, with cumulative
+set to true. (Otherwise if  cumulativewas set to false, SumKeyVal would
+provide the sum for the application window.) �Malhar provides a number
+of built-in operators for simple operations like this so that
+application developers do not have to write them. �More examples to
+follow. This operator assumes that the application restarts before the
+market opens every day.</p>
+<p>The operator Quote:
+This operator has three input ports, which are price (from
+StockTickInput), daily_vol (from
+Daily Volume), and time (from
+ StockTickInput). �This operator
+just consolidates the three data items and and emits the consolidated
+data. �It utilizes the class ConsolidatorKeyVal&lt;K&gt;�from the
+stream�package.</p>
+<p>The operator HighLow:�This operator reads from the input port,
+which contains the price tuples from StockTickInput, and provides the high and the
+low price within the application window. �It utilizes the library class
+ RangeKeyVal&lt;K,V&gt;�provided
+in the math�package. In this case,
+RangeKeyVal&lt;String,Double&gt;.</p>
+<p>The operator MinuteVolume:
+This operator reads from the input port, which contains the
+volume tuples from StockTickInput,
+and aggregates the data to provide the sum of the volume within one
+minute. �Like the operator  DailyVolume, this operator also uses
+SumKeyVal&lt;String,Long&gt;, but
+with cumulative set to false. �The
+Application Window is set to one minute. We will explain how to set this
+later.</p>
+<p>The operator Chart:
+This operator is very similar to the operator Quote, except that it takes inputs from
+High Low�and  Minute Vol�and outputs the consolidated tuples
+to the output port.</p>
+<p>The operator PriceSMA:
+SMA stands for - Simple Moving Average. It reads from the
+input port, which contains the price tuples from StockTickInput, and
+provides the moving average price of the stock. �It utilizes
+SimpleMovingAverage&lt;String,Double&gt;, which is provided in the
+ multiwindow�package.
+SimpleMovingAverage keeps track of the data of the previous N
+application windows in a sliding manner. �For each end window event, it
+provides the average of the data in those application windows.</p>
+<p>The operator Console:
+This operator just outputs the input tuples to the console
+(or stdout). �In this example, there are four console�operators, which connect to the output
+of  Quote, Chart, PriceSMA and VolumeSMA. �In
+practice, they should be replaced by operators that use the data to
+produce visualization artifacts like charts.</p>
+<p>Connecting the operators together and constructing the
+DAG:�Now that we know the
+operators used, we will create the DAG, set the streaming window size,
+instantiate the operators, and connect the operators together by adding
+streams that connect the output ports with the input ports among those
+operators. �This code is in the file  YahooFinanceApplication.java. Refer to Figure 1
+again for the graphical representation of the DAG. �The last method in
+the code, namely getApplication(),
+does all that. �The rest of the methods are just for setting up the
+operators.</p>
+<pre><code class="java">package com.datatorrent.demos.yahoofinance;
+
+import com.datatorrent.api.ApplicationFactory;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.math.RangeKeyVal;
+import com.datatorrent.lib.math.SumKeyVal;
+import com.datatorrent.lib.multiwindow.SimpleMovingAverage;
+import com.datatorrent.lib.stream.ConsolidatorKeyVal;
+import com.datatorrent.lib.util.HighLow;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Yahoo! Finance application demo. &lt;p&gt;
+ *
+ * Get Yahoo finance feed and calculate minute price range, minute volume, simple moving average of 5 minutes.
+ */
+public class Application implements StreamingApplication
+{
+  private int streamingWindowSizeMilliSeconds = 1000; // 1 second (default is 500ms)
+  private int appWindowCountMinute = 60;   // 1 minute
+  private int appWindowCountSMA = 5 * 60;  // 5 minute
+
+  /**
+   * Get actual Yahoo finance ticks of symbol, last price, total daily volume, and last traded price.
+   */
+  public StockTickInput getStockTickInputOperator(String name, DAG dag)
+  {
+    StockTickInput oper = dag.addOperator(name, StockTickInput.class);
+    oper.readIntervalMillis = 200;
+    return oper;
+  }
+
+  /**
+   * This sends total daily volume by adding volumes from each ticks.
+   */
+  public SumKeyVal&lt;String, Long&gt; getDailyVolumeOperator(String name, DAG dag)
+  {
+    SumKeyVal&lt;String, Long&gt; oper = dag.addOperator(name, new SumKeyVal&lt;String, Long&gt;());
+    oper.setType(Long.class);
+    oper.setCumulative(true);
+    return oper;
+  }
+
+  /**
+   * Get aggregated volume of 1 minute and send at the end window of 1 minute.
+   */
+  public SumKeyVal&lt;String, Long&gt; getMinuteVolumeOperator(String name, DAG dag, int appWindowCount)
+  {
+    SumKeyVal&lt;String, Long&gt; oper = dag.addOperator(name, new SumKeyVal&lt;String, Long&gt;());
+    oper.setType(Long.class);
+    oper.setEmitOnlyWhenChanged(true);
+dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);
+    return oper;
+  }
+
+  /**
+   * Get High-low range for 1 minute.
+   */
+  public RangeKeyVal&lt;String, Double&gt; getHighLowOperator(String name, DAG dag, int appWindowCount)
+  {
+    RangeKeyVal&lt;String, Double&gt; oper = dag.addOperator(name, new RangeKeyVal&lt;String, Double&gt;());
+    dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);
+    oper.setType(Double.class);
+    return oper;
+  }
+
+  /**
+   * Quote (Merge price, daily volume, time)
+   */
+  public ConsolidatorKeyVal&lt;String,Double,Long,String,?,?&gt; getQuoteOperator(String name, DAG dag)
+  {
+    ConsolidatorKeyVal&lt;String,Double,Long,String,?,?&gt; oper = dag.addOperator(name, new ConsolidatorKeyVal&lt;String,Double,Long,String,Object,Object&gt;());
+    return oper;
+  }
+
+  /**
+   * Chart (Merge minute volume and minute high-low)
+   */
+  public ConsolidatorKeyVal&lt;String,HighLow,Long,?,?,?&gt; getChartOperator(String name, DAG dag)
+  {
+    ConsolidatorKeyVal&lt;String,HighLow,Long,?,?,?&gt; oper = dag.addOperator(name, new ConsolidatorKeyVal&lt;String,HighLow,Long,Object,Object,Object&gt;());
+    return oper;
+  }
+
+  /**
+   * Get simple moving average of price.
+   */
+  public SimpleMovingAverage&lt;String, Double&gt; getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount)
+  {
+    SimpleMovingAverage&lt;String, Double&gt; oper = dag.addOperator(name, new SimpleMovingAverage&lt;String, Double&gt;());
+    oper.setWindowSize(appWindowCount);
+    oper.setType(Double.class);
+    return oper;
+  }
+
+  /**
+   * Get console for output.
+   */
+  public InputPort&lt;Object&gt; getConsole(String name, /*String nodeName,*/ DAG dag, String prefix)
+  {
+    ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class);
+    oper.setStringFormat(prefix + &quot;: %s&quot;);
+    return oper.input;
+  }
+
+  /**
+   * Create Yahoo Finance Application DAG.
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    dag.getAttributes().put(DAG.STRAM_WINDOW_SIZE_MILLIS,streamingWindowSizeMilliSeconds);
+
+    StockTickInput tick = getStockTickInputOperator(&quot;StockTickInput&quot;, dag);
+    SumKeyVal&lt;String, Long&gt; dailyVolume = getDailyVolumeOperator(&quot;DailyVolume&quot;, dag);
+    ConsolidatorKeyVal&lt;String,Double,Long,String,?,?&gt; quoteOperator = getQuoteOperator(&quot;Quote&quot;, dag);
+
+    RangeKeyVal&lt;String, Double&gt; highlow = getHighLowOperator(&quot;HighLow&quot;, dag, appWindowCountMinute);
+    SumKeyVal&lt;String, Long&gt; minuteVolume = getMinuteVolumeOperator(&quot;MinuteVolume&quot;, dag, appWindowCountMinute);
+    ConsolidatorKeyVal&lt;String,HighLow,Long,?,?,?&gt; chartOperator = getChartOperator(&quot;Chart&quot;, dag);
+
+    SimpleMovingAverage&lt;String, Double&gt; priceSMA = getPriceSimpleMovingAverageOperator(&quot;PriceSMA&quot;, dag, appWindowCountSMA);
+       DefaultPartitionCodec&lt;String, Double&gt; codec = new DefaultPartitionCodec&lt;String, Double&gt;();
+    dag.setInputPortAttribute(highlow.data, PortContext.STREAM_CODEC, codec);
+    dag.setInputPortAttribute(priceSMA.data, PortContext.STREAM_CODEC, codec);
+    dag.addStream(&quot;price&quot;, tick.price, quoteOperator.in1, highlow.data, priceSMA.data);
+    dag.addStream(&quot;vol&quot;, tick.volume, dailyVolume.data, minuteVolume.data);
+    dag.addStream(&quot;time&quot;, tick.time, quoteOperator.in3);
+    dag.addStream(&quot;daily_vol&quot;, dailyVolume.sum, quoteOperator.in2);
+
+    dag.addStream(&quot;quote_data&quot;, quoteOperator.out, getConsole(&quot;quoteConsole&quot;, dag, &quot;QUOTE&quot;));
+
+    dag.addStream(&quot;high_low&quot;, highlow.range, chartOperator.in1);
+    dag.addStream(&quot;vol_1min&quot;, minuteVolume.sum, chartOperator.in2);
+    dag.addStream(&quot;chart_data&quot;, chartOperator.out, getConsole(&quot;chartConsole&quot;, dag, &quot;CHART&quot;));
+
+    dag.addStream(&quot;sma_price&quot;, priceSMA.doubleSMA, getConsole(&quot;priceSMAConsole&quot;, dag, &quot;Price SMA&quot;));
+
+    return dag;
+  }
+
+}
+</code></pre>
+
+<p>Note that we also set a user-specific sliding window for SMA that
+keeps track of the previous N data points. �Do not confuse this with the
+attribute APPLICATION_WINDOW_COUNT.</p>
+<p>In the rest of this chapter we will run through the process of
+running this application. We assume that �you are familiar with details
+of your Hadoop infrastructure. For installation
+details please refer to the <a href="http://docs.datatorrent.com/installation/">Installation Guide</a>.</p>
+<h2 id="running-a-test-application_1">Running a Test Application</h2>
+<p>We will now describe how to run the yahoo
+finance application�described above in different modes
+(local mode, single node on Hadoop, and multi-nodes on Hadoop).</p>
+<p>The platform runs streaming applications under the control of a
+light-weight Streaming Application Manager (STRAM). Each application has
+its own instance of STRAM. STRAM launches the application and
+continually provides run time monitoring, analysis, and takes action
+such as load scaling or outage recovery as needed. �We will discuss
+STRAM in more detail in the next chapter.</p>
+<p>The instructions below assume that the platform was installed in a
+directory &lt;INSTALL_DIR&gt; and the command line interface (CLI) will
+be used to launch the demo application. An application can be run in
+local mode�(in IDE or from command line) or on a Hadoop cluster.</p>
+<p>To start the Apex CLI run</p>
+<pre><code>&lt;INSTALL_DIR&gt;/bin/apex
+</code></pre>
+<p>The command line prompt appears.  To start the application in local mode (the actual version number in the file name may differ)</p>
+<pre><code>apex&gt; launch -local &lt;INSTALL_DIR&gt;/yahoo-finance-demo-3.2.0-SNAPSHOT.apa
+</code></pre>
+<p>To terminate the application in local mode, enter Ctrl-C</p>
+<p>Tu run the application on the Hadoop cluster (the actual version
+number in the file name may differ)</p>
+<pre><code>apex&gt; launch &lt;INSTALL_DIR&gt;/yahoo-finance-demo-3.2.0-SNAPSHOT.apa
+</code></pre>
+<p>To stop the application running in Hadoop, terminate it in the Apex CLI:</p>
+<pre><code>apex&gt; kill-app
+</code></pre>
+<p>Executing the application in either mode includes the following
+steps. At a top level, STRAM (Streaming Application Manager) validates
+the application (DAG), translates the logical plan to the physical plan
+and then launches the execution engine. The mode determines the
+resources needed and how how they are used.</p>
+<h2 id="local-mode">Local Mode</h2>
+<p>In local mode, the application is run as a single-process�with multiple threads. Although a
+few Hadoop classes are needed, there is no dependency on a Hadoop
+cluster or Hadoop services. The local file system is used in place of
+HDFS. This mode allows a quick run of an application in a single process
+sandbox, and hence is the most suitable to debug and analyze the
+application logic. This mode is recommended for developing the
+application and can be used for running applications within the IDE for
+functional testing purposes. Due to limited resources and lack �of
+scalability an application running in this single process mode is more
+likely to encounter throughput bottlenecks. A distributed cluster is
+recommended for benchmarking and production testing.</p>
+<h2 id="hadoop-cluster">Hadoop Cluster</h2>
+<p>In this section we discuss various Hadoop cluster setups.</p>
+<h3 id="single-node-cluster">Single Node Cluster</h3>
+<p>In a single node Hadoop cluster all services are deployed on a
+single server (a developer can use his/her development machine as a
+single node cluster). The platform does not distinguish between a single
+or multi-node setup and behaves exactly the same in both cases.</p>
+<p>In this mode, the resource manager, name node, data node, and node
+manager occupy one process each. This is an example of running a
+streaming application as a multi-process�application on the same server.
+With prevalence of fast, multi-core systems, this mode is effective for
+debugging, fine tuning, and generic analysis before submitting the job
+to a larger Hadoop cluster. In this mode, execution uses the Hadoop
+services and hence is likely to identify issues that are related to the
+Hadoop environment (such issues will not be uncovered in local mode).
+The throughput will obviously not be as high as on a multi-node Hadoop
+cluster. Additionally, since each container (i.e. Java process) requires
+a significant amount of memory, you will be able to run a much smaller
+number of containers than on a multi-node cluster.</p>
+<h3 id="multi-node-cluster">Multi-Node Cluster</h3>
+<p>In a multi-node Hadoop cluster all the services of Hadoop are
+typically distributed across multiple nodes in a production or
+production-level test environment. Upon launch the application is
+submitted to the Hadoop cluster and executes as a  multi-processapplication on�multiple nodes.</p>
+<p>Before you start deploying, testing and troubleshooting your
+application on a cluster, you should ensure that Hadoop (version 2.2.0
+or later)�is properly installed and
+you have basic skills for working with it.</p>
+<hr />
+<h1 id="apache-apex-platform-overview">Apache Apex Platform Overview</h1>
+<h2 id="streaming-computational-model">Streaming Computational Model</h2>
+<p>In this chapter, we describe the the basics of the real-time streaming platform and its computational model.</p>
+<p>The platform is designed to enable completely asynchronous real time computations�done in as unblocked a way as possible with
+minimal overhead .</p>
+<p>Applications running in the platform are represented by a Directed
+Acyclic Graph (DAG) made up of �operators and streams. All computations
+are done in memory on arrival of
+the input data, with an option to save the output to disk (HDFS) in a
+non-blocking way. The data that flows between operators consists of
+atomic data elements. Each data element along with its type definition
+(henceforth called  schema) is
+called a tuple.�An application is a
+design of the flow of these tuples to and from
+the appropriate compute units to enable the computation of the final
+desired results.�A message queue (henceforth called
+�buffer server) manages tuples streaming
+between compute units in different processes.This server keeps track of
+all consumers, publishers, partitions, and enables replay. More
+information is given in later section.</p>
+<p>The streaming application is monitored by a decision making entity
+called STRAM (streaming application
+manager).�STRAM is designed to be a light weight
+controller that has minimal but sufficient interaction with the
+application. This is done via periodic heartbeats. The
+STRAM does the initial launch and periodically analyzes the system
+metrics to decide if any run time action needs to be taken.</p>
+<p>A fundamental building block for the streaming platform
+is the concept of breaking up a stream into equal finite time slices
+called streaming windows. Each window contains the ordered
+set of tuples in that time slice. A typical duration of a window is 500
+ms, but can be configured per application (the Yahoo! Finance
+application configures this value in the  properties.xml�file to be 1000ms = 1s). Each
+window is preceded by a begin_window�event and is terminated by an
+end_window�event, and is assigned
+a unique window ID. Even though the platform performs computations at
+the tuple level, bookkeeping is done at the window boundary, making the
+computations within a window an atomic event in the platform. �We can
+think of each window as an  atomic
+micro-batch�of tuples, to be processed together as one
+atomic operation (See Figure 2). �</p>
+<p>This atomic batching allows the platform to avoid the very steep
+per tuple bookkeeping cost and instead has a manageable per batch
+bookkeeping cost. This translates to higher throughput, low recovery
+time, and higher scalability. Later in this document we illustrate how
+the atomic micro-batch concept allows more efficient optimization
+algorithms.</p>
+<p>The platform also has in-built support for
+application windows.� An application window is part of the
+application specification, and can be a small or large multiple of the
+streaming window. �An example from our Yahoo! Finance test application
+is the moving average, calculated over a sliding application window of 5
+minutes which equates to 300 (= 5 * 60) streaming windows.</p>
+<p>Note that these two window concepts are distinct. �A streaming
+window is an abstraction of many tuples into a higher atomic event for
+easier management. �An application window is a group of consecutive
+streaming windows used for data aggregation (e.g. sum, average, maximum,
+minimum) on a per operator level.</p>
+<p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image02.png" /></p>
+<p>Alongside the platform,�a set of
+predefined, benchmarked standard library operator templates is provided
+for ease of use and rapid development of application.�These
+operators are open sourced to Apache Software Foundation under the
+project name \u201cMalhar\u201d as part of our efforts to foster community
+innovation. These operators can be used in a DAG as is, while others
+have properties�that can be set to specify the
+desired computation. Those interested in details, should refer to
+<a href="https://github.com/apache/incubator-apex-malhar">Apex-Malhar operator library</a>.</p>
+<p>The platform is a Hadoop YARN native
+application. It runs in a Hadoop cluster just like any
+other YARN application (MapReduce etc.) and is designed to seamlessly
+integrate with rest of Hadoop technology stack. It leverages Hadoop as
+much as possible and relies on it as its distributed operating system.
+Hadoop dependencies include resource management, compute/memory/network
+allocation, HDFS, security, fault tolerance, monitoring, metrics,
+multi-tenancy, logging etc. Hadoop classes/concepts are reused as much
+as possible.  The aim is to enable enterprises
+to leverage their existing Hadoop infrastructure for real time streaming
+applications. The platform is designed to scale with big
+data applications and scale with Hadoop.</p>
+<p>A streaming application is an asynchronous execution of
+computations across distributed nodes. All computations are done in
+parallel on a distributed cluster. The computation model is designed to
+do as many parallel computations as possible in a non-blocking fashion.
+The task of monitoring of the entire application is done on (streaming)
+window boundaries with a streaming window as an atomic entity. A window
+completion is a quantum of work done. There is no assumption that an
+operator can be interrupted at precisely a particular tuple or window.</p>
+<p>An operator itself also
+cannot assume or predict the exact time a tuple that it emitted would
+get consumed by downstream operators. The operator processes the tuples
+it gets and simply emits new tuples based on its business logic. The
+only guarantee it has is that the upstream operators are processing
+either the current or some later window, and the downstream operator is
+processing either the current or some earlier window. The completion of
+a window (i.e. propagation of the  end_window�event through an operator) in any
+operator guarantees that all upstream operators have finished processing
+this window. Thus, the end_window�event is blocking on an operator
+with multiple outputs, and is a synchronization point in the DAG. The
+ begin_window�event does not have
+any such restriction, a single begin_window�event from any upstream operator
+triggers the operator to start processing tuples.</p>
+<h2 id="streaming-application-manager-stram">Streaming Application Manager (STRAM)</h2>
+<p>Streaming Application Manager (STRAM) is the Hadoop YARN native
+application master. STRAM is the first process that is activated upon
+application launch and orchestrates the streaming application on the
+platform. STRAM is a lightweight controller process. The
+responsibilities of STRAM include</p>
+<ol>
+<li>
+<p>Running the Application</p>
+<ul>
+<li>Read the�logical plan�of the application (DAG) submitted by the client</li>
+<li>Validate the logical plan</li>
+<li>Translate the logical plan into a physical plan, where certain operators may  be partitioned (i.e. replicated) to multiple operators for  handling load.</li>
+<li>Request resources (Hadoop containers) from Resource Manager,
+    per physical plan</li>
+<li>Based on acquired resources and application attributes, create
+    an execution plan�by partitioning the DAG into fragments,
+    each assigned to different containers.</li>
+<li>Executes the application by deploying each fragment to
+    its container. Containers then start stream processing and run
+    autonomously, processing one streaming window after another. Each
+    container is represented as an instance of the  StreamingContainer�class, which updates
+    STRAM via the heartbeat protocol and processes directions received
+    from STRAM.</li>
+</ul>
+</li>
+<li>
+<p>Continually monitoring the application via heartbeats from each StreamingContainer</p>
+</li>
+<li>Collecting Application System Statistics and Logs</li>
+<li>Logging all application-wide decisions taken</li>
+<li>Providing system data on the state of the application via a  Web Service.</li>
+<li>
+<p>Supporting Fault Tolerance</p>
+<p>a.  Detecting a node outage
+b.  Requesting a replacement resource from the Resource Manager
+    and scheduling state restoration for the streaming operators
+c.  Saving state to Zookeeper</p>
+</li>
+<li>
+<p>Supporting Dynamic Partitioning:�Periodically evaluating the SLA and modifying the physical plan if required
+    (logical plan does not change).</p>
+</li>
+<li>Enabling Security:�Distributing security tokens for distributed components of the execution engine
+    and securing web service requests.</li>
+<li>Enabling Dynamic modification of DAG: In the future, we intend to allow for user initiated
+    modification of the logical plan to allow for changes to the
+    processing logic and functionality.</li>
+</ol>
+<p>An example of the Yahoo! Finance Quote application scheduled on a
+cluster of 5 Hadoop containers (processes) is shown in Figure 3.</p>
+<p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image01.png" /></p>
+<p>An example for the translation from a logical plan to a physical
+plan and an execution plan for a subset of the application is shown in
+Figure 4.</p>
+<p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image04.png" /></p>
+<h2 id="hadoop-components">Hadoop Components</h2>
+<p>In this section we cover some aspects of Hadoop that your
+streaming application interacts with. This section is not meant to
+educate the reader on Hadoop, but just get the reader acquainted with
+the terms. We strongly advise readers to learn Hadoop from other
+sources.</p>
+<p>A streaming application runs as a native Hadoop 2.2 application.
+Hadoop 2.2 does not differentiate between a map-reduce job and other
+applications, and hence as far as Hadoop is concerned, the streaming
+application is just another job. This means that your application
+leverages all the bells and whistles Hadoop provides and is fully
+supported within Hadoop technology stack. The platform is responsible
+for properly integrating itself with the relevant components of Hadoop
+that exist today and those that may emerge in the future</p>
+<p>All investments that leverage multi-tenancy (for example quotas
+and queues), security (for example kerberos), data flow integration (for
+example copying data in-out of HDFS), monitoring, metrics collections,
+etc. will require no changes when streaming applications run on
+Hadoop.</p>
+<h3 id="yarn">YARN</h3>
+<p><a href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site">YARN</a>is
+the core library of Hadoop 2.2 that is tasked with resource management
+and works as a distributed application framework. In this section we
+will walk through Yarn's components. In Hadoop 2.2, the old jobTracker
+has been replaced by a combination of ResourceManager (RM) and
+ApplicationMaster (AM).</p>
+<h4 id="resource-manager-rm">Resource Manager (RM)</h4>
+<p><a href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html">ResourceManager</a>(RM)
+manages all the distributed resources. It allocates and arbitrates all
+the slots and the resources (cpu, memory, network) of these slots. It
+works with per-node NodeManagers (NMs) and per-application
+ApplicationMasters (AMs). Currently memory usage is monitored by RM; in
+upcoming releases it will have CPU as well as network management. RM is
+shared by map-reduce and streaming applications. Running streaming
+applications requires no changes in the RM.</p>
+<h4 id="application-master-am">Application Master (AM)</h4>
+<p>The AM is the watchdog or monitoring process for your application
+and has the responsibility of negotiating resources with RM and
+interacting with NodeManagers to get the allocated containers started.
+The AM is the starting point of your application and is considered user
+code (not system Hadoop code). The AM itself runs in one container. All
+resource management within the application are managed by the AM. This
+is a critical feature for Hadoop 2.2 where tasks done by jobTracker in
+Hadoop 1.0 have been distributed allowing Hadoop 2.2 to scale much
+beyond Hadoop 1.0. STRAM is a native YARN ApplicationManager.</p>
+<h4 id="node-managers-nm">Node Managers (NM)</h4>
+<p>There is one <a href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html">NodeManager</a>(NM)
+per node in the cluster. All the containers (i.e. processes) on that
+node are monitored by the NM. It takes instructions from RM and manages
+resources of that node as per RM instructions. NMs interactions are same
+for map-reduce and for streaming applications. Running streaming
+applications requires no changes in the NM.</p>
+<h4 id="rpc-protocol">RPC Protocol</h4>
+<p>Communication among RM, AM, and NM is done via the Hadoop RPC
+protocol. Streaming applications use the same protocol to send their
+data. No changes are needed in RPC support provided by Hadoop to enable
+communication done by components of your application.</p>
+<h3 id="hdfs">HDFS</h3>
+<p>Hadoop includes a highly fault tolerant, high throughput
+distributed file system (<a href="http://hadoop.apache.org/docs/r1.0.4/hdfs_design.html">HDFS</a>).
+It runs on commodity hardware, and your streaming application will, by
+default, use it. There is no difference between files created by a
+streaming application and those created by map-reduce.</p>
+<h1 id="developing-an-application">Developing An Application</h1>
+<p>In this chapter we describe the methodology to develop an
+application using the Realtime Streaming Platform. The platform was
+designed to make it easy to build and launch sophisticated streaming
+applications with the developer having to deal only with the
+application/business logic. The platform deals with details of where to
+run what operators on which servers and how to correctly route streams
+of data among them.</p>
+<h2 id="development-process">Development Process</h2>
+<p>While the platform does not mandate a specific methodology or set
+of development tools, we have recommendations to maximize productivity
+for the different phases of application development.</p>
+<h4 id="design">Design</h4>
+<ul>
+<li>Identify common, reusable operators. Use a library
+    if possible.</li>
+<li>Identify scalability and performance requirements before
+    designing the DAG.</li>
+<li>Leverage attributes that the platform supports for scalability
+    and performance.</li>
+<li>Use operators that are benchmarked and tested so that later
+    surprises are minimized. If you have glue code, create appropriate
+    unit tests for it.</li>
+<li>Use THREAD_LOCAL locality for high throughput streams. If all
+    the operators on that stream cannot fit in one container,
+    try�NODE_LOCAL�locality. Both THREAD_LOCAL and
+    NODE_LOCAL streams avoid the Network Interface Card (NIC)
+    completly. The former uses intra-process communication to also avoid
+    serialization-deserialization overhead.</li>
+<li>The overall throughput and latencies are are not necessarily
+    correlated to the number of operators in a simple way -- the
+    relationship is more nuanced. A lot depends on how much work
+    individual operators are doing, how many are able to operate in
+    parallel, and how much data is flowing through the arcs of the DAG.
+    It is, at times, better to break a computation down into its
+    constituent simple parts and then stitch them together via streams
+    to better utilize the compute resources of the cluster. Decide on a
+    per application basis the fine line between complexity of each
+    operator vs too many streams. Doing multiple computations in one
+    operator does save network I/O, while operators that are too complex
+    are hard to maintain.</li>
+<li>Do not use operators that depend on the order of two streams
+    as far as possible. In such cases behavior is not idempotent.</li>
+<li>Persist key information to HDFS if possible; it may be useful
+    for debugging later.</li>
+<li>Decide on an appropriate fault tolerance mechanism. If some
+    data loss is acceptable, use the at-most-once mechanism as it has
+    fastest recovery.</li>
+</ul>
+<h4 id="creating-new-project">Creating New Project</h4>
+<p>Please refer to the <a href="../application_packages/">Apex Application Packages</a>�for
+the basic steps for creating a new project.</p>
+<h4 id="writing-the-application-code">Writing the application code</h4>
+<p>Preferably use an IDE (Eclipse, Netbeans etc.) that allows you to
+manage dependencies and assists with the Java coding. Specific benefits
+include ease of managing operator library jar files, individual operator
+classes, ports and properties. It will also highlight and assist to
+rectify issues such as type mismatches when adding streams while
+typing.</p>
+<h4 id="testing">Testing</h4>
+<p>Write test cases with JUnit or similar test framework so that code
+is tested as it is written. For such testing, the DAG can run in local
+mode within the IDE. Doing this may involve writing mock input or output
+operators for the integration points with external systems. For example,
+instead of reading from a live data stream, the application in test mode
+can read from and write to files. This can be done with a single
+application DAG by instrumenting a test mode using settings in the
+configuration that is passed to the application factory
+interface.</p>
+<p>Good test coverage will not only eliminate basic validation errors
+such as missing port connections or property constraint violations, but
+also validate the correct processing of the data. The same tests can be
+re-run whenever the application or its dependencies change (operator
+libraries, version of the platform etc.)</p>
+<h4 id="running-an-application">Running an application</h4>
+<p>The platform provides a commandline tool called <em>apex</em> for managing applications (launching,
+killing, viewing, etc.). This tool was already discussed above briefly
+in the section entitled Running the Test Application. It will introspect
+the jar file specified with the launch command for applications (classes
+that implement ApplicationFactory) or properties files that define
+applications. It will also deploy the dependency jar files from the
+application package to the cluster.</p>
+<p>Apex CLI can run the application in local mode (i.e. outside a
+cluster). It is recommended to first run the application in local mode
+in the development environment before launching on the Hadoop cluster.
+This way some of the external system integration and correct
+functionality of the application can be verified in an easier to debug
+environment before testing distributed mode.</p>
+<p>For more details on CLI please refer to the <a href="../apex_cli/">Apex CLI Guide</a>.</p>
+<h2 id="application-api">Application API</h2>
+<p>This section introduces the API to write a streaming application.
+The work involves connecting operators via streams to form the logical
+DAG. The steps are</p>
+<ol>
+<li>
+<p>Instantiate an application (DAG)</p>
+</li>
+<li>
+<p>(Optional) Set Attributes</p>
+<ul>
+<li>Assign application name</li>
+<li>Set any other attributes as per application requirements</li>
+</ul>
+</li>
+<li>
+<p>Create/re-use and instantiate operators</p>
+<ul>
+<li>Assign operator name that is unique within the  application</li>
+<li>Declare schema upfront for each operator (and thereby its ports)</li>
+<li>(Optional) Set properties� and attributes on the dag as per specification</li>
+<li>Connect ports of operators via streams<ul>
+<li>Each stream connects one output port of an operator to one or  more input ports of other operators.</li>
+<li>(Optional) Set attributes on the streams</li>
+</ul>
+</li>
+</ul>
+</li>
+<li>
+<p>Test the application.</p>
+</li>
+</ol>
+<p>There are two methods to create an application, namely Java, and
+Properties file. Java API is for applications being developed by humans,
+and properties file (Hadoop like) is more suited for DAGs generated by
+tools.</p>
+<h3 id="java-api">Java API</h3>
+<p>The Java API is the most common way to create a streaming
+application. It is meant for application developers who prefer to
+leverage the features of Java, and the ease of use and enhanced
+productivity provided by IDEs like NetBeans or Eclipse. Using Java to
+specify the application provides extra validation abilities of Java
+compiler, such as compile time checks for type safety at the time of
+writing the code. Later in this chapter you can read more about
+validation support in the platform.</p>
+<p>The developer specifies the streaming application by implementing
+the ApplicationFactory interface, which is how platform tools (CLI etc.)
+recognize and instantiate applications. Here we show how to create a
+Yahoo! Finance application that streams the last trade price of a ticker
+and computes the high and low price in every 1 min window. Run above
+ test application�to execute the
+DAG in local mode within the IDE.</p>
+<p>Let us revisit how the Yahoo! Finance test application constructs the DAG:</p>
+<pre><code class="java">public class Application implements StreamingApplication
+{
+
+  ...
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    dag.getAttributes().attr(DAG.STRAM_WINDOW_SIZE_MILLIS).set(streamingWindowSizeMilliSeconds);
+
+    StockTickInput tick = getStockTickInputOperator(&quot;StockTickInput&quot;, dag);
+    SumKeyVal&lt;String, Long&gt; dailyVolume = getDailyVolumeOperator(&quot;DailyVolume&quot;, dag);
+    ConsolidatorKeyVal&lt;String,Double,Long,String,?,?&gt; quoteOperator = getQuoteOperator(&quot;Quote&quot;, dag);
+
+    RangeKeyVal&lt;String, Double&gt; highlow = getHighLowOperator(&quot;HighLow&quot;, dag, appWindowCountMinute);
+    SumKeyVal&lt;String, Long&gt; minuteVolume = getMinuteVolumeOperator(&quot;MinuteVolume&quot;, dag, appWindowCountMinute);
+    ConsolidatorKeyVal&lt;String,HighLow,Long,?,?,?&gt; chartOperator = getChartOperator(&quot;Chart&quot;, dag);
+
+    SimpleMovingAverage&lt;String, Double&gt; priceSMA = getPriceSimpleMovingAverageOperator(&quot;PriceSMA&quot;, dag, appWindowCountSMA);
+
+    dag.addStream(&quot;price&quot;, tick.price, quoteOperator.in1, highlow.data, priceSMA.data);
+    dag.addStream(&quot;vol&quot;, tick.volume, dailyVolume.data, minuteVolume.data);
+    dag.addStream(&quot;time&quot;, tick.time, quoteOperator.in3);
+    dag.addStream(&quot;daily_vol&quot;, dailyVolume.sum, quoteOperator.in2);
+
+    dag.addStream(&quot;quote_data&quot;, quoteOperator.out, getConsole(&quot;quoteConsole&quot;, dag, &quot;QUOTE&quot;));
+
+    dag.addStream(&quot;high_low&quot;, highlow.range, chartOperator.in1);
+    dag.addStream(&quot;vol_1min&quot;, minuteVolume.sum, chartOperator.in2);
+    dag.addStream(&quot;chart_data&quot;, chartOperator.out, getConsole(&quot;chartConsole&quot;, dag, &quot;CHART&quot;));
+
+    dag.addStream(&quot;sma_price&quot;, priceSMA.doubleSMA, getConsole(&quot;priceSMAConsole&quot;, dag, &quot;Price SMA&quot;));
+
+    return dag;
+  }
+}
+</code></pre>
+
+<h2 id="json-file-dag-specification">JSON File DAG Specification</h2>
+<p>In addition to Java, you can also specify the DAG using JSON, provided the operators in the DAG are present in the dependency jars. Create src/main/resources/app directory under your app package project, and put your JSON files there. This is the specification of a JSON file that specifies an application.</p>
+<p>Create a json file under src/main/resources/app, For example <code>myApplication.json</code></p>
+<pre><code>{
+  &quot;description&quot;: &quot;{application description}&quot;,
+  &quot;operators&quot;: [
+    {
+      &quot;name&quot;: &quot;{operator name}&quot;,
+      &quot;class&quot;: &quot;{fully qualified class name of the operator}&quot;,
+      &quot;properties&quot;: {
+        &quot;{property key}&quot;: &quot;{property value}&quot;,
+        ...
+      }
+    }, ...
+  ],
+  &quot;streams&quot;: [
+    {
+      &quot;name&quot;: &quot;{stream name}&quot;,
+      &quot;source&quot;: {
+        &quot;operatorName&quot;: &quot;{source operator name}&quot;,
+        &quot;portName&quot;: &quot;{source operator output port name}&quot;
+      }
+      &quot;sinks&quot;: [
+        {
+          &quot;operatorName&quot;: &quot;{sink operator name}&quot;,
+          &quot;portName&quot;: &quot;{sink operator input port name}&quot;
+        }, ...
+      ]
+    }, ...
+  ]
+}
+
+</code></pre>
+
+<ul>
+<li>The name of the JSON file is taken as the name of the application.</li>
+<li>The <code>description</code> field is the description of the application and is optional.</li>
+<li>The <code>operators</code> field is the list of operators the application has. You can specifiy the name, the Java class, and the properties of each operator here.</li>
+<li>The <code>streams</code> field is the list of streams that connects the operators together to form the DAG. Each stream consists of the stream name, the operator and port that it connects from, and the list of operators and ports that it connects to. Note that you can connect from <em>one</em> output port of an operator to <em>multiple</em> different input ports of different operators.</li>
+</ul>
+<p>In Apex Malhar, there is an <a href="https://github.com/apache/incubator-apex-malhar/blob/master/demos/pi/src/main/resources/app/PiJsonDemo.json">example</a> in the Pi Demo doing just that.</p>
+<h3 id="properties-file-dag-specification">Properties File DAG Specification</h3>
+<p>The platform also supports specification of a DAG via a properties
+file. The aim here to make it easy for tools to create and run an
+application. This method of specification does not have the Java
+compiler support of compile time check, but since these applications
+would be created by software, they should be correct by construction.
+The syntax is derived from Hadoop properties and should be easy for
+folks who are used to creating software that integrated with
+Hadoop.</p>
+<p>Under the src/main/resources/app directory (create if it doesn't exist), create a properties file.
+For example <code>myApplication.properties</code></p>
+<pre><code># input operator that reads from a file
+dt.operator.inputOp.classname=com.acme.SampleInputOperator
+dt.operator.inputOp.fileName=somefile.txt
+
+# output operator that writes to the console
+dt.operator.outputOp.classname=com.acme.ConsoleOutputOperator
+
+# stream connecting both operators
+dt.stream.inputStream.source=inputOp.outputPort
+dt.stream.inputStream.sinks=outputOp.inputPort
+</code></pre>
+
+<p>Above snippet is intended to convey the basic idea of specifying
+the DAG using properties file. Operators would come from a predefined
+library and referenced in the specification by class name and port names
+(obtained from the library providers documentation or runtime
+introspection by tools). For those interested in details, see later
+sections and refer to the  Operation and
+Installation Guide�mentioned above.</p>
+<h3 id="attributes">Attributes</h3>
+<p>Attributes impact the runtime behavior of the application. They do
+not impact the functionality. An example of an attribute is application
+name. Setting it changes the application name. Another example is
+streaming window size. Setting it changes the streaming window size from
+the default value to the specified value. Users cannot add new
+attributes, they can only choose from the ones that come packaged and
+pre-supported by the platform. Details of attributes are covered in the
+ Operation and Installation
+Guide.</p>
+<h2 id="operators">Operators</h2>
+<p>Operators�are basic compute units.
+Operators process each incoming tuple and emit zero or more tuples on
+output ports as per the business logic. The data flow, connectivity,
+fault tolerance (node outage), etc. is taken care of by the platform. As
+an operator developer, all that is needed is to figure out what to do
+with the incoming tuple and when (and which output port) to send out a
+particular output tuple. Correctly designed operators will most likely
+get reused. Operator design needs care and foresight. For details, refer
+to the  <a href="../operator_development/">Operator Developer Guide</a>. As an application developer you need to connect operators
+in a way that it implements your business logic. You may also require
+operator customization for functionality and use attributes for
+performance/scalability etc.</p>
+<p>All operators process tuples asynchronously in a distributed
+cluster. An operator cannot assume or predict the exact time a tuple
+that it emitted will get consumed by a downstream operator. An operator
+also cannot predict the exact time when a tuple arrives from an upstream
+operator. The only guarantee is that the upstream operators are
+processing the current or a future window, i.e. the windowId of upstream
+operator is equals or exceeds its own windowId. Conversely the windowId
+of a downstream operator is less than or equals its own windowId. The
+end of a window operation, i.e. the API call to endWindow on an operator
+requires that all upstream operators have finished processing this
+window. This means that completion of processing a window propagates in
+a blocking fashion through an operator. Later sections provides more
+details on streams and data flow of tuples.</p>
+<p>Each operator has a unique name within the DAG as provided by the
+user. This is the name of the operator in the logical plan. The name of
+the operator in the physical plan is an integer assigned to it by STRAM.
+These integers are use the sequence from 1 to N, where N is total number
+of physically unique operators in the DAG. �Following the same rule,
+each partitioned instance of a logical operator has its own integer as
+an id. This id along with the Hadoop container name uniquely identifies
+the operator in the execution plan of the DAG. The logical names and the
+physical names are required for web service support. Operators can be
+accessed via both names. These same names are used while interacting
+with apex cli�to access an operator.
+Ideally these names should be self-descriptive. For example in Figure 1,
+the node named \u201cDaily volume\u201d has a physical identifier of 2.</p>
+<h3 id="operator-interface">Operator Interface</h3>
+<p>Operator interface in a DAG consists of ports,�properties,�and attributes.
+Operators interact with other components of the DAG via ports. Functional behavior of the operators
+can be customized via parameters. Run time performance and physical
+instantiation is controlled by attributes. Ports and parameters are
+fields (variables) of the Operator class/object, while attributes are
+meta information that is attached to the operator object via an
+AttributeMap. An operator must have at least one port. Properties are
+optional. Attributes are provided by the platform and always have a
+default value that enables normal functioning of operators.</p>
+<h4 id="ports">Ports</h4>
+<p>Ports are connection points by which an operator receives and
+emits tuples. These should be transient objects instantiated in the
+operator object, that implement particular interfaces. Ports should be
+transient as they contain no state. They have a pre-defined schema and
+can only be connected to other ports with the same schema. An input port
+needs to implement the interface  Operator.InputPort�and
+interface Sink. A default
+implementation of these is provided by the abstract class DefaultInputPort. An output port needs to
+implement the interface  Operator.OutputPort. A default implementation
+of this is provided by the concrete class DefaultOutputPort. These two are a quick way to
+implement the above interfaces, but operator developers have the option
+of providing their own implementations.</p>
+<p>Here are examples of an input and an output port from the operator
+Sum.</p>
+<pre><code class="java">@InputPortFieldAnnotation(name = &quot;data&quot;)
+public final transient DefaultInputPort&lt;V&gt; data = new DefaultInputPort&lt;V&gt;() {
+  @Override
+  public void process(V tuple)
+  {
+    ...
+  }
+}
+@OutputPortFieldAnnotation(optional=true)
+public final transient DefaultOutputPort&lt;V&gt; sum = new DefaultOutputPort&lt;V&gt;(){ \u2026 };
+</code></pre>
+
+<p>The process call is in the Sink interface. An emit on an output
+port is done via emit(tuple) call. For the above example it would be
+sum.emit(t), where the type of t is the generic parameter V.</p>
+<p>There is no limit on how many ports an operator can have. However
+any operator must have at least one port. An operator with only one port
+is called an Input Adapter if it has no input port and an Output Adapter
+if it has no output port. These are special operators needed to get/read
+data from outside system/source into the application, or push/write data
+into an outside system/sink. These could be in Hadoop or outside of
+Hadoop. These two operators are in essence gateways for the streaming
+application to communicate with systems outside the application.</p>
+<p>Port connectivity can be validated during compile time by adding
+PortFieldAnnotations shown above. By default all ports have to be
+connected, to allow a port to go unconnected, you need to add
+\u201coptional=true\u201d to the annotation.</p>
+<p>Attributes can be specified for ports that affect the runtime
+behavior. An example of an attribute is parallel partition that specifes
+a parallel computation flow per partition. It is described in detail in
+the Parallel Partitions section. Another example is queue capacity that specifies the buffer size for the
+port. Details of attributes are covered in  Operation and Installation Guide.</p>
+<h4 id="properties">Properties</h4>
+<p>Properties are the abstractions by which functional behavior of an
+operator can be customized. They should be non-transient objects
+instantiated in the operator object. They need to be non-transient since
+they are part of the operator state and re-construction of the operator
+object from its checkpointed state must restore the operator to the
+desired state. Properties are optional, i.e. an operator may or may not
+have properties; they are part of user code and their values are not
+interpreted by the platform in any way.</p>
+<p>All non-serializable objects should be declared transient.
+Examples include sockets, session information, etc. These objects should
+be initialized during setup call, which is called every time the
+operator is initialized.</p>
+<h4 id="attributes_1">Attributes</h4>
+<p>Attributes are values assigned to the operators that impact
+run-time. This includes things like the number of partitions, at most
+once or at least once or exactly once recovery modes, etc. Attributes do
+not impact functionality of the operator. Users can change certain
+attributes in runtime. Users cannot add attributes to operators; they
+are pre-defined by the platform. They are interpreted by the platform
+and thus cannot be defined in user created code (like properties).
+Details of attributes are covered in  <a href="http://docs.datatorrent.com/configuration/">Configuration Guide</a>.</p>
+<h3 id="operator-state">Operator State</h3>
+<p>The state of an operator is defined as the data that it transfers
+from one window to a future window. Since the computing model of the
+platform is to treat windows like micro-batches, the operator state can
+be checkpointed every Nth window, or every T units of time, where T is significantly greater
+than the streaming window. �When an operator is checkpointed, the entire
+object is written to HDFS. �The larger the amount of state in an
+operator, the longer it takes to recover from a failure. A stateless
+operator can recover much quicker than a stateful one. The needed
+windows are preserved by the upstream buffer server and are used to
+recompute the lost windows, and also rebuild the buffer server in the
+current container.</p>
+<p>The distinction between Stateless and Stateful is based solely on
+the need to transfer data in the operator from one window to the next.
+The state of an operator is independent of the number of ports.</p>
+<h4 id="stateless">Stateless</h4>
+<p>A Stateless operator is defined as one where no data is needed to
+be kept at the end of every window. This means that all the computations
+of a window can be derived from all the tuples the operator receives
+within that window. This guarantees that the output of any window can be
+reconstructed by simply replaying the tuples that arrived in that
+window. Stateless operators are more efficient in terms of fault
+tolerance, and cost to achieve SLA.</p>
+<h4 id="stateful">Stateful</h4>
+<p>A Stateful operator is defined as one where data is needed to be
+stored at the end of a window for computations occurring in later
+window; a common example is the computation of a sum of values in the
+input tuples.</p>
+<h3 id="operator-api">Operator API</h3>
+<p>The Operator API consists of methods that operator developers may
+need to override. In this section we will discuss the Operator APIs from
+the point of view of an application developer. Knowledge of how an
+operator works internally is critical for writing an application. Those
+interested in the details should refer to  Malhar Operator Developer Guide.</p>
+<p>The APIs are available in three modes, namely Single Streaming
+Window, Sliding Application Window, and Aggregate Application Window.
+These are not mutually exclusive, i.e. an operator can use single
+streaming window as well as sliding application window. A physical
+instance of an operator is always processing tuples from a single
+window. The processing of tuples is guaranteed to be sequential, no
+matter which input port the tuples arrive on.</p>
+<p>In the later part of this section we will evaluate three common
+uses of streaming windows by applications. They have different
+characteristics and implications on optimization and recovery mechanisms
+(i.e. algorithm used to recover a node after outage) as discussed later
+in the section.</p>
+<h4 id="streaming-window">Streaming Window</h4>
+<p>Streaming window is atomic micro-batch computation period. The API
+methods relating to a streaming window are as follows</p>
+<pre><code class="java">public void process(&lt;tuple_type&gt; tuple) // Called on the input port on which the tuple arrives
+public void beginWindow(long windowId) // Called at the start of the window as soon as the first begin_window tuple arrives
+public void endWindow() // Called at the end of the window after end_window tuples arrive on all input ports
+public void setup(OperatorContext context) // Called once during initialization of the operator
+public void teardown() // Called once when the operator is being shutdown
+</code></pre>
+
+<p>A tuple can be emitted in any of the three streaming run-time
+calls, namely beginWindow, process, and endWindow but not in setup or
+teardown.</p>
+<h4 id="aggregate-application-window">Aggregate Application Window</h4>
+<p>An operator with an aggregate window is stateful within the
+application window timeframe and possibly stateless at the end of that
+application window. An size of an aggregate application window is an
+operator attribute and is defined as a multiple of the streaming window
+size. The platform recognizes this attribute and optimizes the operator.
+The beginWindow, and endWindow calls are not invoked for those streaming
+windows that do not align with the application window. For example in
+case of streaming window of 0.5 second and application window of 5
+minute, an application window spans 600 streaming windows (5*60*2 =
+600). At the start of the sequence of these 600 atomic streaming
+windows, a beginWindow gets invoked, and at the end of these 600
+streaming windows an endWindow gets invoked. All the intermediate
+streaming windows do not invoke beginWindow or endWindow. Bookkeeping,
+node recovery, stats, UI, etc. continue to work off streaming windows.
+For example if operators are being checkpointed say on an average every
+30th window, then the above application window would have about 20
+checkpoints.</p>
+<h4 id="sliding-application-window">Sliding Application Window</h4>
+<p>A sliding window is computations that requires previous N
+streaming windows. After each streaming window the Nth past window is
+dropped and the new window is added to the computation. An operator with
+sliding window is a stateful operator at end of any window. The sliding
+window period is an attribute and is a multiple of streaming window. The
+platform recognizes this attribute and leverages it during bookkeeping.
+A sliding aggregate window with tolerance to data loss does not have a
+very high bookkeeping cost. The cost of all three recovery mechanisms,
+ at most once�(data loss tolerant),
+at least once�(data loss
+intolerant), and exactly once�(data
+loss intolerant and no extra computations) is same as recovery
+mechanisms based on streaming window. STRAM is not able to leverage this
+operator for any extra optimization.</p>
+<h3 id="single-vs-multi-input-operator">Single vs Multi-Input Operator</h3>
+<p>A single-input operator by definition has a single upstream
+operator, since there can only be one writing port for a stream. �If an
+operator has a single upstream operator, then the beginWindow on the
+upstream also blocks the beginWindow of the single-input operator. For
+an operator to start processing any window at least one upstream
+operator has to start processing that window. A multi-input operator
+reads from more than one upstream ports. Such an operator would start
+processing as soon as the first begin_window event arrives. However the
+window would not close (i.e. invoke endWindow) till all ports receive
+end_window events for that windowId. Thus the end of a window is a
+blocking event. As we saw earlier, a multi-input operator is also the
+point in the DAG where windows of all upstream operators are
+synchronized. The windows (atomic micro-batches) from a faster (or just
+ahead in processing) upstream operators are queued up till the slower
+upstream operator catches up. STRAM monitors such bottlenecks and takes
+corrective actions. The platform ensures minimal delay, i.e processing
+starts as long as at least one upstream operator has started
+processing.</p>
+<h3 id="recovery-mechanisms">Recovery Mechanisms</h3>
+<p>Application developers can set any of the recovery mechanisms
+below to deal with node outage. In general, the cost of recovery depends
+on the state of the operator, while data integrity is dependant on the
+application. The mechanisms are per window as the platform treats
+windows as atomic compute units. Three recovery mechanisms are
+supported, namely</p>
+<ul>
+<li>At-least-once: All atomic batches are processed at least once.
+    No data loss occurs.</li>
+<li>At-most-once: All atomic batches are processed at most once.
+    Data loss is possible; this is the most efficient setting.</li>
+<li>Exactly-once: All atomic batches are processed exactly once.
+    No data loss occurs; this is the least efficient setting since
+    additional work is needed to ensure proper semantics.</li>
+</ul>
+<p>At-least-once is the default. During a recovery event, the
+operator connects to the upstream buffer server and asks for windows to
+be replayed. At-least-once and exactly-once mechanisms start from its
+checkpointed state. At-most-once starts from the next begin-window
+event.</p>
+<p>Recovery mechanisms can be specified per Operator while writing
+the application as shown below.</p>
+<pre><code class="java">Operator o = dag.addOperator(\u201coperator\u201d, \u2026);
+dag.setAttribute(o,  OperatorContext.PROCESSING_MODE,  ProcessingMode.AT_MOST_ONCE);
+</code></pre>
+
+<p>Also note that once an operator is attributed to AT_MOST_ONCE,
+all the operators downstream to it have to be AT_MOST_ONCE. The client
+will give appropriate warnings or errors if that\u2019s not the case.</p>
+<p>Details are explained in the chapter on Fault Tolerance below.</p>
+<h2 id="streams">Streams</h2>
+<p>A stream�is a connector
+(edge) abstraction, and is a fundamental building block of the platform.
+A stream consists of tuples that flow from one port (called the
+output�port) to one or more ports
+on other operators (called  input�ports) another -- so note a potentially
+confusing aspect of this terminology: tuples enter a stream through its
+output port and leave via one or more input ports. A stream has the
+following characteristics</p>
+<ul>
+<li>Tuples are always delivered in the same order in which they
+    were emitted.</li>
+<li>Consists of a sequence of windows one after another. Each
+    window being a collection of in-order tuples.</li>
+<li>A stream that connects two containers passes through a
+    buffer server.</li>
+<li>All streams can be persisted (by default in HDFS).</li>
+<li>Exactly one output port writes to the stream.</li>
+<li>Can be read by one or more input ports.</li>
+<li>Connects operators within an application, not outside
+    an application.</li>
+<li>Has an unique name within an application.</li>
+<li>Has attributes which act as hints to STRAM.</li>
+<li>
+<p>Streams have four modes, namely in-line, in-node, in-rack,
+    and other. Modes may be overruled (for example due to lack
+    of containers). They are defined as follows:</p>
+<ul>
+<li>THREAD_LOCAL: In the same thread, uses thread
+    stack (intra-thread). This mode can only be used for a downstream
+    operator which has only one input port connected; also called
+    in-line.</li>
+<li>CONTAINER_LOCAL: In the same container (intra-process); also
+    called in-container.</li>
+<li>NODE_LOCAL: In the same Hadoop node (inter processes, skips
+    NIC); also called in-node.</li>
+<li>RACK_LOCAL: On nodes in the same rack; also called
+    in-rack.</li>
+<li>unspecified: No guarantee. Could be anywhere within the
+    cluster</li>
+</ul>
+</li>
+</ul>
+<p>An example of a stream declaration is given below</p>
+<pre><code class="java">DAG dag = new DAG();
+ \u2026
+dag.addStream(&quot;views&quot;, viewAggregate.sum, cost.data).setLocality(CONTAINER_LOCAL); // A container local  stream
+dag.addStream(\u201cclicks\u201d, clickAggregate.sum, rev.data); // An example of unspecified locality
+</code></pre>
+
+<p>The platform guarantees in-order delivery of tuples in a stream.
+STRAM views each stream as collection of ordered windows. Since no tuple
+can exist outside a window, a replay of a stream consists of replay of a
+set of windows. When multiple input ports read the same stream, the
+execution plan of a stream ensures that each input port is logically not
+blocked by the reading of another input port. The schema of a stream is
+same as the schema of the tuple.</p>
+<p>In a stream all tuples emitted by an operator in a window belong
+to that window. A replay of this window would consists of an in-order
+replay of all the tuples. Thus the tuple order within a stream is
+guaranteed. However since an operator may receive multiple streams (for
+example an operator with two input ports), the order of arrival of two
+tuples belonging to different streams is not guaranteed. In general in
+an asynchronous distributed architecture this is expected. Thus the
+operator (specially one with multiple input ports) should not depend on
+the tuple order from two streams. One way to cope with this
+indeterminate order, if necessary, is to wait to get all the tuples of a
+window and emit results in endWindow call. All operator templates
+provided as part of Malhar operator library follow these principles.</p>
+<p>A logical stream gets partitioned into physical streams each
+connecting the partition to the upstream operator. If two different
+attributes are needed on the same stream, it should be split using
+StreamDuplicator�operator.</p>
+<p>Modes of the streams are critical for performance. An in-line
+stream is the most optimal as it simply delivers the tuple as-is without
+serialization-deserialization. Streams should be marked
+container_local, specially in case where there is a large tuple volume
+between two operators which then on drops significantly. Since the
+setLocality call merely provides a hint, STRAM may ignore it. An In-node
+stream is not as efficient as an in-line one, but it is clearly better
+than going off-node since it still avoids the potential bottleneck of
+the network card.</p>
+<p>THREAD_LOCAL and CONTAINER_LOCAL streams do not use a buffer
+server as this stream is in a single process. The other two do.</p>
+<h2 id="affinity-rules">Affinity Rules</h2>
+<p>Affinity Rules in Apex provide a way to specify hints on how operators should be deployed in a cluster. Sometimes you may want to allocate certain operators on the same or different nodes for performance or other reasons. Affinity rules can be used in such cases to make sure these considerations are honored by platform.</p>
+<p>There can be two types of rules: Affinity and anti-affinity rules. Affinity rule indicates that the group of operators in the rule should be allocated together.  On the other hand, anti-affinity rule indicates that the group of operators should be allocated separately.</p>
+<h3 id="specifying-affinity-rules">Specifying Affinity Rules</h3>
+<p>A list of Affinity Rules can be specified for an application by setting attribute: DAGContext.AFFINITY_RULES_SET.
+Here is an example for setting Affinity rules for an application in the populateDag method:</p>
+<pre><code class="java">AffinityRulesSet ruleSet = new AffinityRulesSet();
+List&lt;AffinityRule&gt; rules = new ArrayList&lt;&gt;();
+// Add Affinity rules as per requirement
+rules.add(new AffinityRule(Type.ANTI_AFFINITY, Locality.NODE_LOCAL, false, &quot;rand&quot;, &quot;operator1&quot;, &quot;operator2&quot;));
+rules.add(new AffinityRule(Type.AFFINITY, Locality.CONTAINER_LOCAL, false, &quot;console&quot;, &quot;rand&quot;));
+ruleSet.setAffinityRules(rules);
+dag.setAttribute(DAGContext.AFFINITY_RULES_SET, ruleSet);
+</code></pre>
+
+<p>As shown in the example above, each rule has a type AFFINITY or ANTI_AFFINITY indicating whether operators in the group should be allocated together or separate. These can be applied on any operators in DAG.</p>
+<p>The operators for rule can be provided either as a list of 2 or more operator names or as a regular expression. The regex should match at least two operators in DAG to be considered a valid rule. Here is example of rule with regex to allocate all the operators in DAG on the same node:</p>
+<pre><code class="java">// Allocate all operators starting with console on same container
+rules.add(new AffinityRule(Type.AFFINI

<TRUNCATED>