You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/03/04 17:15:13 UTC

[05/11] incubator-apex-core git commit: Migrating docs

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/7af835b0/application_development.md
----------------------------------------------------------------------
diff --git a/application_development.md b/application_development.md
deleted file mode 100644
index a76ad8a..0000000
--- a/application_development.md
+++ /dev/null
@@ -1,2934 +0,0 @@
-Application Developer Guide
-===========================
-
-Real-time big data processing is not only important but has become
-critical for businesses which depend on accurate and timely analysis of
-their business data. A few businesses have yielded to very expensive
-solutions like building an in-house, real-time analytics infrastructure
-supported by an internal development team, or buying expensive
-proprietary software. A large number of businesses are dealing with the
-requirement just by trying to make Hadoop do their batch jobs in smaller
-iterations. Over the last few years, Hadoop has become ubiquitous in the
-big data processing space, replacing expensive proprietary hardware and
-software solutions for massive data processing with very cost-effective,
-fault-tolerant, open-sourced, and commodity-hardware-based solutions.
-While Hadoop has been a game changer for companies, it is primarily a
-batch-oriented system, and does not yet have a viable option for
-real-time data processing.  Most companies with real-time data
-processing end up having to build customized solutions in addition to
-their Hadoop infrastructure.
-
- 
-
-The DataTorrent platform is designed to process massive amounts of
-real-time events natively in Hadoop. This can be event ingestion,
-processing, and aggregation for real-time data analytics, or can be
-real-time business logic decisioning such as cell tower load balancing,
-real-time ads bidding, or fraud detection.  The platform has the ability
-to repair itself in real-time (without data loss) if hardware fails, and
-adapt to changes in load by adding and removing computing resources
-automatically.
-
-
-
-DataTorrent is a native Hadoop application. It runs as a YARN
-(Hadoop 2.x) application and leverages Hadoop as a distributed operating
-system. All the basic distributed operating system capabilities of
-Hadoop like resource allocation (Resource Manager, distributed file system (HDFS),
-multi-tenancy, security, fault-tolerance, scalability, etc.
-are supported natively in all streaming applications.  Just as Hadoop
-for map-reduce handles all the details of the application allowing you
-to only focus on writing the application (the mapper and reducer
-functions), the platform handles all the details of streaming execution,
-allowing you to only focus on your business logic. Using the platform
-removes the need to maintain separate clusters for real-time
-applications.
-
-
-
-In the platform, building a streaming application can be extremely
-easy and intuitive.  The application is represented as a Directed
-Acyclic Graph (DAG) of computation units called *Operators* interconnected
-by the data-flow edges called  *Streams*. The operators process input
-streams and produce output streams. A library of common operators is
-provided to enable quick application development.  In case the desired
-processing is not available in the Operator Library, one can easily
-write a custom operator. We refer those interested in creating their own
-operators to the [Operator Development Guide](operator_development.md).
-
-Running A Test Application
-=======================================
-
-This chapter will help you with a quick start on running an
-application. If you are starting with the platform for the first time,
-it would be informative to open an existing application and see it run.
-Do the following steps to run the PI demo, which computes the value of
-PI  in a simple
-manner:
-
-1.  Open up platform files in your IDE (for example NetBeans, or Eclipse)
-2.  Open Demos project
-3.  Open Test Packages and run ApplicationTest.java in pi package
-4.  See the results in your system console
-
-
-
-Congratulations, you just ran your first real-time streaming demo
-:) This demo is very simple and has four operators. The first operator
-emits random integers between 0 to 30, 000. The second operator receives
-these coefficients and emits a hashmap with x and y values each time it
-receives two values. The third operator takes these values and computes
-x\*\*2+y\*\*2. The last operator counts how many computed values from
-the previous operator were less than or equal to 30, 000\*\*2. Assuming
-this count is N, then PI is computed as N/number of values received.
-Here is the code snippet for the PI application. This code populates the
-DAG. Do not worry about what each line does, we will cover these
-concepts later in this document.
-
-
-```java
-// Generates random numbers
-RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
-rand.setMinvalue(0);
-rand.setMaxvalue(30000);
-
-// Generates a round robin HashMap of "x" and "y"
-RoundRobinHashMap<String,Object> rrhm = dag.addOperator("rrhm", new RoundRobinHashMap<String, Object>());
-rrhm.setKeys(new String[] { "x", "y" });
-
-// Calculates pi from x and y
-JavaScriptOperator calc = dag.addOperator("picalc", new Script());
-calc.setPassThru(false);
-calc.put("i",0);
-calc.put("count",0);
-calc.addSetupScript("function pi() { if (x*x+y*y <= "+maxValue*maxValue+") { i++; } count++; return i / count * 4; }");
-calc.setInvoke("pi");
-dag.addStream("rand_rrhm", rand.integer_data, rrhm.data);
-dag.addStream("rrhm_calc", rrhm.map, calc.inBindings);
-
-// puts results on system console
-ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
-dag.addStream("rand_console",calc.result, console.input);
-```
-
-
-You can review the other demos and see what they do. The examples
-given in the Demos project cover various features of the platform and we
-strongly encourage you to read these to familiarize yourself with the
-platform. In the remaining part of this document we will go through
-details needed for you to develop and run streaming applications in
-Malhar.
-
-Test Application: Yahoo! Finance Quotes
-----------------------------------------------------
-
-The PI application was to
-get you started. It is a basic application and does not fully illustrate
-the features of the platform. For the purpose of describing concepts, we
-will consider the test application shown in Figure 1. The application
-downloads tick data from  [Yahoo! Finance](http://finance.yahoo.com)  and computes the
-following for four tickers, namely [IBM](http://finance.yahoo.com/q?s=IBM),
-[GOOG](http://finance.yahoo.com/q?s=GOOG), [YHOO](http://finance.yahoo.com/q?s=YHOO).
-
-1.  Quote: Consisting of last trade price, last trade time, and
-    total volume for the day
-2.  Per-minute chart data: Highest trade price, lowest trade
-    price, and volume during that minute
-3.  Simple Moving Average: trade price over 5 minutes
-
-
-Total volume must ensure that all trade volume for that day is
-added, i.e. data loss would result in wrong results. Charting data needs
-all the trades in the same minute to go to the same slot, and then on it
-starts afresh, so again data loss would result in wrong results. The
-aggregation for charting data is done over 1 minute. Simple moving
-average computes the average price over a 5 minute sliding window; it
-too would produce wrong results if there is data loss. Figure 1 shows
-the application with no partitioning.
-
-
-
-![](images/application_development/ApplicationDeveloperGuide.html-image00.png)
-
-
-
-The operator StockTickerInput: StockTickerInput[ ](http://docs.google.com/../apidocs/com/datatorrent/demos/yahoofinance/StockTickInput.html)is
-the input operator that reads live data from Yahoo! Finance once per
-interval (user configurable in milliseconds), and emits the price, the
-incremental volume, and the last trade time of each stock symbol, thus
-emulating real ticks from the exchange.  We utilize the Yahoo! Finance
-CSV web service interface.  For example:
-
-
-```
-$ GET 'http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&f=sl1vt1'
-"IBM",203.966,1513041,"1:43pm"
-"GOOG",762.68,1879741,"1:43pm"
-"AAPL",444.3385,11738366,"1:43pm"
-"YHOO",19.3681,14707163,"1:43pm"
-```
-
-
-Among all the operators in Figure 1, StockTickerInput is the only
-operator that requires extra code because it contains a custom mechanism
-to get the input data.  Other operators are used unchanged from the
-Malhar library.
-
-
-Here is the class implementation for StockTickInput:
-
-
-```java
-package com.datatorrent.demos.yahoofinance;
-
-import au.com.bytecode.opencsv.CSVReader;
-import com.datatorrent.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.lib.util.KeyValPair;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.*;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpStatus;
-import org.apache.commons.httpclient.cookie.CookiePolicy;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.params.DefaultHttpParams;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This operator sends price, volume and time into separate ports and calculates incremental volume.
- */
-public class StockTickInput implements InputOperator
-{
-  private static final Logger logger = LoggerFactory.getLogger(StockTickInput.class);
-  /**
-   * Timeout interval for reading from server. 0 or negative indicates no timeout.
-   */
-  public int readIntervalMillis = 500;
-  /**
-   * The URL of the web service resource for the POST request.
-   */
-  private String url;
-  public String[] symbols;
-  private transient HttpClient client;
-  private transient GetMethod method;
-  private HashMap<String, Long> lastVolume = new HashMap<String, Long>();
-  private boolean outputEvenIfZeroVolume = false;
-  /**
-   * The output port to emit price.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<KeyValPair<String, Double>> price = new DefaultOutputPort<KeyValPair<String, Double>>();
-  /**
-   * The output port to emit incremental volume.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<KeyValPair<String, Long>> volume = new DefaultOutputPort<KeyValPair<String, Long>>();
-  /**
-   * The output port to emit last traded time.
-   */
-  @OutputPortFieldAnnotation(optional = true)
-  public final transient DefaultOutputPort<KeyValPair<String, String>> time = new DefaultOutputPort<KeyValPair<String, String>>();
-
-  /**
-   * Prepare URL from symbols and parameters. URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&f=sl1vt1
-   *
-   * @return the URL
-   */
-  private String prepareURL()
-  {
-    String str = "http://download.finance.yahoo.com/d/quotes.csv?s=";
-    for (int i = 0; i < symbols.length; i++) {
-      if (i != 0) {
-        str += ",";
-      }
-      str += symbols[i];
-    }
-    str += "&f=sl1vt1&e=.csv";
-    return str;
-  }
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    url = prepareURL();
-    client = new HttpClient();
-    method = new GetMethod(url);
-    DefaultHttpParams.getDefaultParams().setParameter("http.protocol.cookie-policy", CookiePolicy.BROWSER_COMPATIBILITY);
-  }
-
-  @Override
-  public void teardown()
-  {
-  }
-
-  @Override
-  public void emitTuples()
-  {
-
-    try {
-      int statusCode = client.executeMethod(method);
-      if (statusCode != HttpStatus.SC_OK) {
-        System.err.println("Method failed: " + method.getStatusLine());
-      }
-      else {
-        InputStream istream = method.getResponseBodyAsStream();
-        // Process response
-        InputStreamReader isr = new InputStreamReader(istream);
-        CSVReader reader = new CSVReader(isr);
-        List<String[]> myEntries = reader.readAll();
-        for (String[] stringArr: myEntries) {
-          ArrayList<String> tuple = new ArrayList<String>(Arrays.asList(stringArr));
-          if (tuple.size() != 4) {
-            return;
-          }
-          // input csv is <Symbol>,<Price>,<Volume>,<Time>
-          String symbol = tuple.get(0);
-          double currentPrice = Double.valueOf(tuple.get(1));
-          long currentVolume = Long.valueOf(tuple.get(2));
-          String timeStamp = tuple.get(3);
-          long vol = currentVolume;
-          // Sends total volume in first tick, and incremental volume afterwards.
-          if (lastVolume.containsKey(symbol)) {
-            vol -= lastVolume.get(symbol);
-          }
-
-          if (vol > 0 || outputEvenIfZeroVolume) {
-            price.emit(new KeyValPair<String, Double>(symbol, currentPrice));
-            volume.emit(new KeyValPair<String, Long>(symbol, vol));
-            time.emit(new KeyValPair<String, String>(symbol, timeStamp));
-            lastVolume.put(symbol, currentVolume);
-          }
-        }
-      }
-      Thread.sleep(readIntervalMillis);
-    }
-    catch (InterruptedException ex) {
-      logger.debug(ex.toString());
-    }
-    catch (IOException ex) {
-      logger.debug(ex.toString());
-    }
-  }
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-  }
-
-  @Override
-  public void endWindow()
-  {
-  }
-
-  public void setOutputEvenIfZeroVolume(boolean outputEvenIfZeroVolume)
-  {
-	   this.outputEvenIfZeroVolume = outputEvenIfZeroVolume;
-  }
-
-}
-```
-
-
-
-The operator has three output ports that emit the price of the
-stock, the volume of the stock and the last trade time of the stock,
-declared as public member variables price, volume and  time of the class.  The tuple of the
-price output port is a key-value
-pair with the stock symbol being the key, and the price being the value.
- The tuple of the volume output
-port is a key value pair with the stock symbol being the key, and the
-incremental volume being the value.  The tuple of the  time output port is a key value pair with the
-stock symbol being the key, and the last trade time being the
-value.
-
-
-
-Important: Since operators will be
-serialized, all input and output ports need to be declared transient
-because they are stateless and should not be serialized.
-
-
-
-The method setup(OperatorContext)
-contains the code that is necessary for setting up the HTTP
-client for querying Yahoo! Finance.
-
-
-
-Method emitTuples() contains
-the code that reads from Yahoo! Finance, and emits the data to the
-output ports of the operator.  emitTuples() will be called one or more times
-within one application window as long as time is allowed within the
-window.
-
-
-
-Note that we want to emulate the tick input stream by having
-incremental volume data with Yahoo! Finance data.  We therefore subtract
-the previous volume from the current volume to emulate incremental
-volume for each tick.
-
-
-
-The operator
-DailyVolume: This operator
-reads from the input port, which contains the incremental volume tuples
-from StockTickInput, and
-aggregates the data to provide the cumulative volume.  It uses the
-library class  SumKeyVal&lt;K,V&gt; provided in math package.  In this case,
-SumKeyVal&lt;String,Long&gt;, where K is the stock symbol, V is the
-aggregated volume, with cumulative
-set to true. (Otherwise if  cumulativewas set to false, SumKeyVal would
-provide the sum for the application window.)  Malhar provides a number
-of built-in operators for simple operations like this so that
-application developers do not have to write them.  More examples to
-follow. This operator assumes that the application restarts before the
-market opens every day.
-
-
-
-The operator Quote:
-This operator has three input ports, which are price (from
-StockTickInput), daily_vol (from
-Daily Volume), and time (from
- StockTickInput).  This operator
-just consolidates the three data items and and emits the consolidated
-data.  It utilizes the class ConsolidatorKeyVal&lt;K&gt; from the
-stream package.
-
-
-
-The operator HighLow: This operator reads from the input port,
-which contains the price tuples from StockTickInput, and provides the high and the
-low price within the application window.  It utilizes the library class
- RangeKeyVal&lt;K,V&gt; provided
-in the math package. In this case,
-RangeKeyVal&lt;String,Double&gt;.
-
-
-
-The operator MinuteVolume:
-This operator reads from the input port, which contains the
-volume tuples from StockTickInput,
-and aggregates the data to provide the sum of the volume within one
-minute.  Like the operator  DailyVolume, this operator also uses
-SumKeyVal&lt;String,Long&gt;, but
-with cumulative set to false.  The
-Application Window is set to one minute. We will explain how to set this
-later.
-
-
-
-The operator Chart:
-This operator is very similar to the operator Quote, except that it takes inputs from
-High Low and  Minute Vol and outputs the consolidated tuples
-to the output port.
-
-
-
-The operator PriceSMA:
-SMA stands for - Simple Moving Average. It reads from the
-input port, which contains the price tuples from StockTickInput, and
-provides the moving average price of the stock.  It utilizes
-SimpleMovingAverage&lt;String,Double&gt;, which is provided in the
- multiwindow package.
-SimpleMovingAverage keeps track of the data of the previous N
-application windows in a sliding manner.  For each end window event, it
-provides the average of the data in those application windows.
-
-
-
-The operator Console:
-This operator just outputs the input tuples to the console
-(or stdout).  In this example, there are four console operators, which connect to the output
-of  Quote, Chart, PriceSMA and VolumeSMA.  In
-practice, they should be replaced by operators that use the data to
-produce visualization artifacts like charts.
-
-
-
-Connecting the operators together and constructing the
-DAG: Now that we know the
-operators used, we will create the DAG, set the streaming window size,
-instantiate the operators, and connect the operators together by adding
-streams that connect the output ports with the input ports among those
-operators.  This code is in the file  YahooFinanceApplication.java. Refer to Figure 1
-again for the graphical representation of the DAG.  The last method in
-the code, namely getApplication(),
-does all that.  The rest of the methods are just for setting up the
-operators.
-
-
-
-```java
-package com.datatorrent.demos.yahoofinance;
-
-import com.datatorrent.api.ApplicationFactory;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.Operator.InputPort;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.math.RangeKeyVal;
-import com.datatorrent.lib.math.SumKeyVal;
-import com.datatorrent.lib.multiwindow.SimpleMovingAverage;
-import com.datatorrent.lib.stream.ConsolidatorKeyVal;
-import com.datatorrent.lib.util.HighLow;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Yahoo! Finance application demo. <p>
- *
- * Get Yahoo finance feed and calculate minute price range, minute volume, simple moving average of 5 minutes.
- */
-public class Application implements StreamingApplication
-{
-  private int streamingWindowSizeMilliSeconds = 1000; // 1 second (default is 500ms)
-  private int appWindowCountMinute = 60;   // 1 minute
-  private int appWindowCountSMA = 5 * 60;  // 5 minute
-
-  /**
-   * Get actual Yahoo finance ticks of symbol, last price, total daily volume, and last traded price.
-   */
-  public StockTickInput getStockTickInputOperator(String name, DAG dag)
-  {
-    StockTickInput oper = dag.addOperator(name, StockTickInput.class);
-    oper.readIntervalMillis = 200;
-    return oper;
-  }
-
-  /**
-   * This sends total daily volume by adding volumes from each ticks.
-   */
-  public SumKeyVal<String, Long> getDailyVolumeOperator(String name, DAG dag)
-  {
-    SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>());
-    oper.setType(Long.class);
-    oper.setCumulative(true);
-    return oper;
-  }
-
-  /**
-   * Get aggregated volume of 1 minute and send at the end window of 1 minute.
-   */
-  public SumKeyVal<String, Long> getMinuteVolumeOperator(String name, DAG dag, int appWindowCount)
-  {
-    SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>());
-    oper.setType(Long.class);
-    oper.setEmitOnlyWhenChanged(true);
-dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);
-    return oper;
-  }
-
-  /**
-   * Get High-low range for 1 minute.
-   */
-  public RangeKeyVal<String, Double> getHighLowOperator(String name, DAG dag, int appWindowCount)
-  {
-    RangeKeyVal<String, Double> oper = dag.addOperator(name, new RangeKeyVal<String, Double>());
-    dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);
-    oper.setType(Double.class);
-    return oper;
-  }
-
-  /**
-   * Quote (Merge price, daily volume, time)
-   */
-  public ConsolidatorKeyVal<String,Double,Long,String,?,?> getQuoteOperator(String name, DAG dag)
-  {
-    ConsolidatorKeyVal<String,Double,Long,String,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,Double,Long,String,Object,Object>());
-    return oper;
-  }
-
-  /**
-   * Chart (Merge minute volume and minute high-low)
-   */
-  public ConsolidatorKeyVal<String,HighLow,Long,?,?,?> getChartOperator(String name, DAG dag)
-  {
-    ConsolidatorKeyVal<String,HighLow,Long,?,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,HighLow,Long,Object,Object,Object>());
-    return oper;
-  }
-
-  /**
-   * Get simple moving average of price.
-   */
-  public SimpleMovingAverage<String, Double> getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount)
-  {
-    SimpleMovingAverage<String, Double> oper = dag.addOperator(name, new SimpleMovingAverage<String, Double>());
-    oper.setWindowSize(appWindowCount);
-    oper.setType(Double.class);
-    return oper;
-  }
-
-  /**
-   * Get console for output.
-   */
-  public InputPort<Object> getConsole(String name, /*String nodeName,*/ DAG dag, String prefix)
-  {
-    ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class);
-    oper.setStringFormat(prefix + ": %s");
-    return oper.input;
-  }
-
-  /**
-   * Create Yahoo Finance Application DAG.
-   */
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    dag.getAttributes().put(DAG.STRAM_WINDOW_SIZE_MILLIS,streamingWindowSizeMilliSeconds);
-
-    StockTickInput tick = getStockTickInputOperator("StockTickInput", dag);
-    SumKeyVal<String, Long> dailyVolume = getDailyVolumeOperator("DailyVolume", dag);
-    ConsolidatorKeyVal<String,Double,Long,String,?,?> quoteOperator = getQuoteOperator("Quote", dag);
-
-    RangeKeyVal<String, Double> highlow = getHighLowOperator("HighLow", dag, appWindowCountMinute);
-    SumKeyVal<String, Long> minuteVolume = getMinuteVolumeOperator("MinuteVolume", dag, appWindowCountMinute);
-    ConsolidatorKeyVal<String,HighLow,Long,?,?,?> chartOperator = getChartOperator("Chart", dag);
-
-    SimpleMovingAverage<String, Double> priceSMA = getPriceSimpleMovingAverageOperator("PriceSMA", dag, appWindowCountSMA);
-       DefaultPartitionCodec<String, Double> codec = new DefaultPartitionCodec<String, Double>();
-    dag.setInputPortAttribute(highlow.data, PortContext.STREAM_CODEC, codec);
-    dag.setInputPortAttribute(priceSMA.data, PortContext.STREAM_CODEC, codec);
-    dag.addStream("price", tick.price, quoteOperator.in1, highlow.data, priceSMA.data);
-    dag.addStream("vol", tick.volume, dailyVolume.data, minuteVolume.data);
-    dag.addStream("time", tick.time, quoteOperator.in3);
-    dag.addStream("daily_vol", dailyVolume.sum, quoteOperator.in2);
-
-    dag.addStream("quote_data", quoteOperator.out, getConsole("quoteConsole", dag, "QUOTE"));
-
-    dag.addStream("high_low", highlow.range, chartOperator.in1);
-    dag.addStream("vol_1min", minuteVolume.sum, chartOperator.in2);
-    dag.addStream("chart_data", chartOperator.out, getConsole("chartConsole", dag, "CHART"));
-
-    dag.addStream("sma_price", priceSMA.doubleSMA, getConsole("priceSMAConsole", dag, "Price SMA"));
-
-    return dag;
-  }
-
-}
-```
-
-
-
-Note that we also set a user-specific sliding window for SMA that
-keeps track of the previous N data points.  Do not confuse this with the
-attribute APPLICATION_WINDOW_COUNT.
-
-In the rest of this chapter we will run through the process of
-running this application. We assume that  you are familiar with details
-of your Hadoop infrastructure. For installation
-details please refer to the [Installation Guide](installation.md).
-
-
-Running a Test Application
------------------------------------------
-
-We will now describe how to run the yahoo
-finance application described above in different modes
-(local mode, single node on Hadoop, and multi-nodes on Hadoop).
-
-
-The platform runs streaming applications under the control of a
-light-weight Streaming Application Manager (STRAM). Each application has
-its own instance of STRAM. STRAM launches the application and
-continually provides run time monitoring, analysis, and takes action
-such as load scaling or outage recovery as needed.  We will discuss
-STRAM in more detail in the next chapter.
-
-
-
-The instructions below assume that the platform was installed in a
-directory &lt;INSTALL_DIR&gt; and the command line interface (CLI) will
-be used to launch the demo application. An application can be run in
-[local mode](#h.3dy6vkm)[ ](#h.3dy6vkm)(in IDE or from command line) or on a  [Hadoop cluster](#h.1t3h5sf) [.](#h.1t3h5sf)
-
-
-
-To start the dtCli run
-
-    <INSTALL_DIR>/bin/dtcli
-
-The command line prompt appears.  To start the application in local mode (the actual version number in the file name may differ)
-
-    dt> launch -local <INSTALL_DIR>/yahoo-finance-demo-3.2.0-SNAPSHOT.apa
-
-To terminate the application in local mode, enter Ctrl-C
-
-Tu run the application on the Hadoop cluster (the actual version
-number in the file name may differ)
-
-    dt> launch <INSTALL_DIR>/yahoo-finance-demo-3.2.0-SNAPSHOT.apa
-
-
-To stop the application running in Hadoop, terminate it in the dtCli:
-
-    dt> kill-app
-
-
-
-Executing the application in either mode includes the following
-steps. At a top level, STRAM (Streaming Application Manager) validates
-the application (DAG), translates the logical plan to the physical plan
-and then launches the execution engine. The mode determines the
-resources needed and how how they are used.
-
-Local Mode
------------------------
-
-In local mode, the application is run as a single-process with multiple threads. Although a
-few Hadoop classes are needed, there is no dependency on a Hadoop
-cluster or Hadoop services. The local file system is used in place of
-HDFS. This mode allows a quick run of an application in a single process
-sandbox, and hence is the most suitable to debug and analyze the
-application logic. This mode is recommended for developing the
-application and can be used for running applications within the IDE for
-functional testing purposes. Due to limited resources and lack  of
-scalability an application running in this single process mode is more
-likely to encounter throughput bottlenecks. A distributed cluster is
-recommended for benchmarking and production testing.
-
-Hadoop Cluster
----------------------------
-
-In this section we discuss various Hadoop cluster setups.
-
-### Single Node Cluster
-
-In a single node Hadoop cluster all services are deployed on a
-single server (a developer can use his/her development machine as a
-single node cluster). The platform does not distinguish between a single
-or multi-node setup and behaves exactly the same in both cases.
-
-
-
-In this mode, the resource manager, name node, data node, and node
-manager occupy one process each. This is an example of running a
-streaming application as a multi-process application on the same server.
-With prevalence of fast, multi-core systems, this mode is effective for
-debugging, fine tuning, and generic analysis before submitting the job
-to a larger Hadoop cluster. In this mode, execution uses the Hadoop
-services and hence is likely to identify issues that are related to the
-Hadoop environment (such issues will not be uncovered in local mode).
-The throughput will obviously not be as high as on a multi-node Hadoop
-cluster. Additionally, since each container (i.e. Java process) requires
-a significant amount of memory, you will be able to run a much smaller
-number of containers than on a multi-node cluster.
-
-### Multi-Node Cluster
-
-In a multi-node Hadoop cluster all the services of Hadoop are
-typically distributed across multiple nodes in a production or
-production-level test environment. Upon launch the application is
-submitted to the Hadoop cluster and executes as a  multi-processapplication on multiple nodes.
-
-
-
-Before you start deploying, testing and troubleshooting your
-application on a cluster, you should ensure that Hadoop (version 2.2.0
-or later) is properly installed and
-you have basic skills for working with it.
-
-------------------------------------------------------------------------
-
-
-
-
-
-Apache Apex Platform Overview
-========================================
-
-Streaming Computational Model
-------------------------------------------
-
-In this chapter, we describe the the basics of the real-time streaming platform and its computational model.
-
-
-The platform is designed to enable completely asynchronous real time computations done in as unblocked a way as possible with
-minimal overhead .
-
-
-
-Applications running in the platform are represented by a Directed
-Acyclic Graph (DAG) made up of  operators and streams. All computations
-are done in memory on arrival of
-the input data, with an option to save the output to disk (HDFS) in a
-non-blocking way. The data that flows between operators consists of
-atomic data elements. Each data element along with its type definition
-(henceforth called  schema) is
-called a tuple. An application is a
-design of the flow of these tuples to and from
-the appropriate compute units to enable the computation of the final
-desired results. A message queue (henceforth called
- buffer server) manages tuples streaming
-between compute units in different processes.This server keeps track of
-all consumers, publishers, partitions, and enables replay. More
-information is given in later section.
-
-
-
-The streaming application is monitored by a decision making entity
-called STRAM (streaming application
-manager). STRAM is designed to be a light weight
-controller that has minimal but sufficient interaction with the
-application. This is done via periodic heartbeats. The
-STRAM does the initial launch and periodically analyzes the system
-metrics to decide if any run time action needs to be taken.
-
-
-
-A fundamental building block for the streaming platform
-is the concept of breaking up a stream into equal finite time slices
-called streaming windows. Each window contains the ordered
-set of tuples in that time slice. A typical duration of a window is 500
-ms, but can be configured per application (the Yahoo! Finance
-application configures this value in the  properties.xml file to be 1000ms = 1s). Each
-window is preceded by a begin_window event and is terminated by an
-end_window event, and is assigned
-a unique window ID. Even though the platform performs computations at
-the tuple level, bookkeeping is done at the window boundary, making the
-computations within a window an atomic event in the platform.  We can
-think of each window as an  atomic
-micro-batch of tuples, to be processed together as one
-atomic operation (See Figure 2).  
-
-
-
-This atomic batching allows the platform to avoid the very steep
-per tuple bookkeeping cost and instead has a manageable per batch
-bookkeeping cost. This translates to higher throughput, low recovery
-time, and higher scalability. Later in this document we illustrate how
-the atomic micro-batch concept allows more efficient optimization
-algorithms.
-
-
-
-The platform also has in-built support for
-application windows.  An application window is part of the
-application specification, and can be a small or large multiple of the
-streaming window.  An example from our Yahoo! Finance test application
-is the moving average, calculated over a sliding application window of 5
-minutes which equates to 300 (= 5 \* 60) streaming windows.
-
-
-
-Note that these two window concepts are distinct.  A streaming
-window is an abstraction of many tuples into a higher atomic event for
-easier management.  An application window is a group of consecutive
-streaming windows used for data aggregation (e.g. sum, average, maximum,
-minimum) on a per operator level.
-
-![](images/application_development/ApplicationDeveloperGuide.html-image02.png)
-
-Alongside the platform, a set of
-predefined, benchmarked standard library operator templates is provided
-for ease of use and rapid development of application. These
-operators are open sourced to Apache Software Foundation under the
-project name “Malhar” as part of our efforts to foster community
-innovation. These operators can be used in a DAG as is, while others
-have  [properties](#h.32hioqz)
-[ ](#h.32hioqz)that can be set to specify the
-desired computation. Those interested in details, should refer to
-[Apex Malhar Operator Library](apex_malhar.md)
-.
-
-
-
-The platform is a Hadoop YARN native
-application. It runs in a Hadoop cluster just like any
-other YARN application (MapReduce etc.) and is designed to seamlessly
-integrate with rest of Hadoop technology stack. It leverages Hadoop as
-much as possible and relies on it as its distributed operating system.
-Hadoop dependencies include resource management, compute/memory/network
-allocation, HDFS, security, fault tolerance, monitoring, metrics,
-multi-tenancy, logging etc. Hadoop classes/concepts are reused as much
-as possible.  The aim is to enable enterprises
-to leverage their existing Hadoop infrastructure for real time streaming
-applications. The platform is designed to scale with big
-data applications and scale with Hadoop.
-
-
-
-A streaming application is an asynchronous execution of
-computations across distributed nodes. All computations are done in
-parallel on a distributed cluster. The computation model is designed to
-do as many parallel computations as possible in a non-blocking fashion.
-The task of monitoring of the entire application is done on (streaming)
-window boundaries with a streaming window as an atomic entity. A window
-completion is a quantum of work done. There is no assumption that an
-operator can be interrupted at precisely a particular tuple or window.
-
-
-
-
-An operator itself also
-cannot assume or predict the exact time a tuple that it emitted would
-get consumed by downstream operators. The operator processes the tuples
-it gets and simply emits new tuples based on its business logic. The
-only guarantee it has is that the upstream operators are processing
-either the current or some later window, and the downstream operator is
-processing either the current or some earlier window. The completion of
-a window (i.e. propagation of the  end_window event through an operator) in any
-operator guarantees that all upstream operators have finished processing
-this window. Thus, the end_window event is blocking on an operator
-with multiple outputs, and is a synchronization point in the DAG. The
- begin_window event does not have
-any such restriction, a single begin_window event from any upstream operator
-triggers the operator to start processing tuples.
-
-Streaming Application Manager (STRAM)
---------------------------------------------------
-
-Streaming Application Manager (STRAM) is the Hadoop YARN native
-application master. STRAM is the first process that is activated upon
-application launch and orchestrates the streaming application on the
-platform. STRAM is a lightweight controller process. The
-responsibilities of STRAM include
-
-1.  Running the Application
-
-    *  Read the logical plan of the application (DAG) submitted by the client
-    *  Validate the logical plan
-    *  Translate the logical plan into a physical plan, where certain operators may  be partitioned (i.e. replicated) to multiple operators for  handling load.
-    *  Request resources (Hadoop containers) from Resource Manager,
-        per physical plan
-    *  Based on acquired resources and application attributes, create
-        an execution plan by partitioning the DAG into fragments,
-        each assigned to different containers.
-    *  Executes the application by deploying each fragment to
-        its container. Containers then start stream processing and run
-        autonomously, processing one streaming window after another. Each
-        container is represented as an instance of the  StreamingContainer class, which updates
-        STRAM via the heartbeat protocol and processes directions received
-        from STRAM.
-
-2.  Continually monitoring the application via heartbeats from each StreamingContainer
-3.  Collecting Application System Statistics and Logs
-4.  Logging all application-wide decisions taken
-5.  Providing system data on the state of the application via a  Web Service.
-6.  Supporting [Fault Tolerance](#h.2nusc19)
-
-    a.  Detecting a node outage
-    b.  Requesting a replacement resource from the Resource Manager
-        and scheduling state restoration for the streaming operators
-    c.  Saving state to Zookeeper
-
-7.  Supporting [Dynamic
-    Partitioning](#h.3hv69ve)[:](#h.3hv69ve) Periodically
-    evaluating the SLA and modifying the physical plan if required
-    (logical plan does not change).
-8.  Enabling [Security](#h.3q5sasy)[:](#h.3q5sasy) Distributing
-    security tokens for distributed components of the execution engine
-    and securing web service requests.
-9.  Enabling [Dynamic  modification](#h.40ew0vw)[ ](#h.40ew0vw)of
-    DAG: In the future, we intend to allow for user initiated
-    modification of the logical plan to allow for changes to the
-    processing logic and functionality.
-
-
-
-An example of the Yahoo! Finance Quote application scheduled on a
-cluster of 5 Hadoop containers (processes) is shown in Figure 3.
-
-![](images/application_development/ApplicationDeveloperGuide.html-image01.png)
-
-
-
-An example for the translation from a logical plan to a physical
-plan and an execution plan for a subset of the application is shown in
-Figure 4.
-
-
-
-![](images/application_development/ApplicationDeveloperGuide.html-image04.png)
-
-
-
-
-
-Hadoop Components
-------------------------------
-
-In this section we cover some aspects of Hadoop that your
-streaming application interacts with. This section is not meant to
-educate the reader on Hadoop, but just get the reader acquainted with
-the terms. We strongly advise readers to learn Hadoop from other
-sources.
-
-A streaming application runs as a native Hadoop 2.2 application.
-Hadoop 2.2 does not differentiate between a map-reduce job and other
-applications, and hence as far as Hadoop is concerned, the streaming
-application is just another job. This means that your application
-leverages all the bells and whistles Hadoop provides and is fully
-supported within Hadoop technology stack. The platform is responsible
-for properly integrating itself with the relevant components of Hadoop
-that exist today and those that may emerge in the future
-
-
-
-All investments that leverage multi-tenancy (for example quotas
-and queues), security (for example kerberos), data flow integration (for
-example copying data in-out of HDFS), monitoring, metrics collections,
-etc. will require no changes when streaming applications run on
-Hadoop.
-
-### YARN
-
-[YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site)is
-the core library of Hadoop 2.2 that is tasked with resource management
-and works as a distributed application framework. In this section we
-will walk through Yarn's components. In Hadoop 2.2, the old jobTracker
-has been replaced by a combination of ResourceManager (RM) and
-ApplicationMaster (AM).
-
-#### Resource Manager (RM)
-
-[ResourceManager](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html)(RM)
-manages all the distributed resources. It allocates and arbitrates all
-the slots and the resources (cpu, memory, network) of these slots. It
-works with per-node NodeManagers (NMs) and per-application
-ApplicationMasters (AMs). Currently memory usage is monitored by RM; in
-upcoming releases it will have CPU as well as network management. RM is
-shared by map-reduce and streaming applications. Running streaming
-applications requires no changes in the RM.
-
-#### Application Master (AM)
-
-The AM is the watchdog or monitoring process for your application
-and has the responsibility of negotiating resources with RM and
-interacting with NodeManagers to get the allocated containers started.
-The AM is the starting point of your application and is considered user
-code (not system Hadoop code). The AM itself runs in one container. All
-resource management within the application are managed by the AM. This
-is a critical feature for Hadoop 2.2 where tasks done by jobTracker in
-Hadoop 1.0 have been distributed allowing Hadoop 2.2 to scale much
-beyond Hadoop 1.0. STRAM is a native YARN ApplicationManager.
-
-#### Node Managers (NM)
-
-There is one [NodeManager](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html)(NM)
-per node in the cluster. All the containers (i.e. processes) on that
-node are monitored by the NM. It takes instructions from RM and manages
-resources of that node as per RM instructions. NMs interactions are same
-for map-reduce and for streaming applications. Running streaming
-applications requires no changes in the NM.
-
-#### RPC Protocol
-
-Communication among RM, AM, and NM is done via the Hadoop RPC
-protocol. Streaming applications use the same protocol to send their
-data. No changes are needed in RPC support provided by Hadoop to enable
-communication done by components of your application.
-
-### HDFS
-
-Hadoop includes a highly fault tolerant, high throughput
-distributed file system ([HDFS](http://hadoop.apache.org/docs/r1.0.4/hdfs_design.html)).
-It runs on commodity hardware, and your streaming application will, by
-default, use it. There is no difference between files created by a
-streaming application and those created by map-reduce.
-
-Developing An Application
-======================================
-
-In this chapter we describe the methodology to develop an
-application using the Realtime Streaming Platform. The platform was
-designed to make it easy to build and launch sophisticated streaming
-applications with the developer having to deal only with the
-application/business logic. The platform deals with details of where to
-run what operators on which servers and how to correctly route streams
-of data among them.
-
-Development Process
---------------------------------
-
-While the platform does not mandate a specific methodology or set
-of development tools, we have recommendations to maximize productivity
-for the different phases of application development.
-
-#### Design
-
--   Identify common, reusable operators. Use a library
-    if possible.
--   Identify scalability and performance requirements before
-    designing the DAG.
--   Leverage attributes that the platform supports for scalability
-    and performance.
--   Use operators that are benchmarked and tested so that later
-    surprises are minimized. If you have glue code, create appropriate
-    unit tests for it.
--   Use THREAD_LOCAL locality for high throughput streams. If all
-    the operators on that stream cannot fit in one container,
-    try NODE_LOCAL locality. Both THREAD_LOCAL and
-    NODE_LOCAL streams avoid the Network Interface Card (NIC)
-    completly. The former uses intra-process communication to also avoid
-    serialization-deserialization overhead.
--   The overall throughput and latencies are are not necessarily
-    correlated to the number of operators in a simple way -- the
-    relationship is more nuanced. A lot depends on how much work
-    individual operators are doing, how many are able to operate in
-    parallel, and how much data is flowing through the arcs of the DAG.
-    It is, at times, better to break a computation down into its
-    constituent simple parts and then stitch them together via streams
-    to better utilize the compute resources of the cluster. Decide on a
-    per application basis the fine line between complexity of each
-    operator vs too many streams. Doing multiple computations in one
-    operator does save network I/O, while operators that are too complex
-    are hard to maintain.
--   Do not use operators that depend on the order of two streams
-    as far as possible. In such cases behavior is not idempotent.
--   Persist key information to HDFS if possible; it may be useful
-    for debugging later.
--   Decide on an appropriate fault tolerance mechanism. If some
-    data loss is acceptable, use the at-most-once mechanism as it has
-    fastest recovery.
-
-#### Creating New Project
-
-Please refer to the [Apex Application Packages](application_packages.md) for
-the basic steps for creating a new project.
-
-#### Writing the application code
-
-Preferably use an IDE (Eclipse, Netbeans etc.) that allows you to
-manage dependencies and assists with the Java coding. Specific benefits
-include ease of managing operator library jar files, individual operator
-classes, ports and properties. It will also highlight and assist to
-rectify issues such as type mismatches when adding streams while
-typing.
-
-#### Testing
-
-Write test cases with JUnit or similar test framework so that code
-is tested as it is written. For such testing, the DAG can run in local
-mode within the IDE. Doing this may involve writing mock input or output
-operators for the integration points with external systems. For example,
-instead of reading from a live data stream, the application in test mode
-can read from and write to files. This can be done with a single
-application DAG by instrumenting a test mode using settings in the
-configuration that is passed to the application factory
-interface.
-
-Good test coverage will not only eliminate basic validation errors
-such as missing port connections or property constraint violations, but
-also validate the correct processing of the data. The same tests can be
-re-run whenever the application or its dependencies change (operator
-libraries, version of the platform etc.)
-
-#### Running an application
-
-The platform provides a commandline tool called dtcli for managing applications (launching,
-killing, viewing, etc.). This tool was already discussed above briefly
-in the section entitled Running the Test Application. It will introspect
-the jar file specified with the launch command for applications (classes
-that implement ApplicationFactory) or property files that define
-applications. It will also deploy the dependency jar files from the
-application package to the cluster.
-
-
-
-Dtcli can run the application in local mode (i.e. outside a
-cluster). It is recommended to first run the application in local mode
-in the development environment before launching on the Hadoop cluster.
-This way some of the external system integration and correct
-functionality of the application can be verified in an easier to debug
-environment before testing distributed mode.
-
-
-
-For more details on CLI please refer to the [dtCli Guide](dtcli.md).
-
-Application API
-----------------------------
-
-This section introduces the API to write a streaming application.
-The work involves connecting operators via streams to form the logical
-DAG. The steps are
-
-1.  Instantiate an application (DAG)
-
-2.  (Optional) Set Attributes
-    *  Assign application name
-    *  Set any other attributes as per application requirements
-
-3.  Create/re-use and instantiate operators
-    *  Assign operator name that is unique within the  application
-    *  Declare schema upfront for each operator (and thereby its  [ports](#h.ihv636)[)](#h.ihv636)
-    *  (Optional) Set [properties](#h.32hioqz)[ ](#h.32hioqz) and [attributes](#h.41mghml)[ ](#h.41mghml) on the dag as per specification
-    *  Connect ports of operators via streams
-        *  Each stream connects one output port of an operator to one or  more input ports of other operators.
-        *  (Optional) Set attributes on the streams
-
-4.  Test the application.
-
-
-
-There are two methods to create an application, namely Java, and
-Properties file. Java API is for applications being developed by humans,
-and properties file (Hadoop like) is more suited for DAGs generated by
-tools.
-
-### Java API
-
-The Java API is the most common way to create a streaming
-application. It is meant for application developers who prefer to
-leverage the features of Java, and the ease of use and enhanced
-productivity provided by IDEs like NetBeans or Eclipse. Using Java to
-specify the application provides extra validation abilities of Java
-compiler, such as compile time checks for type safety at the time of
-writing the code. Later in this chapter you can read more about
-validation support in the platform.
-
-The developer specifies the streaming application by implementing
-the ApplicationFactory interface, which is how platform tools (CLI etc.)
-recognize and instantiate applications. Here we show how to create a
-Yahoo! Finance application that streams the last trade price of a ticker
-and computes the high and low price in every 1 min window. Run above
- test application to execute the
-DAG in local mode within the IDE.
-
-
-
-Let us revisit how the Yahoo! Finance test application constructs the DAG:
-
-
-
-```java
-public class Application implements StreamingApplication
-{
-
-  ...
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    dag.getAttributes().attr(DAG.STRAM_WINDOW_SIZE_MILLIS).set(streamingWindowSizeMilliSeconds);
-
-    StockTickInput tick = getStockTickInputOperator("StockTickInput", dag);
-    SumKeyVal<String, Long> dailyVolume = getDailyVolumeOperator("DailyVolume", dag);
-    ConsolidatorKeyVal<String,Double,Long,String,?,?> quoteOperator = getQuoteOperator("Quote", dag);
-
-    RangeKeyVal<String, Double> highlow = getHighLowOperator("HighLow", dag, appWindowCountMinute);
-    SumKeyVal<String, Long> minuteVolume = getMinuteVolumeOperator("MinuteVolume", dag, appWindowCountMinute);
-    ConsolidatorKeyVal<String,HighLow,Long,?,?,?> chartOperator = getChartOperator("Chart", dag);
-
-    SimpleMovingAverage<String, Double> priceSMA = getPriceSimpleMovingAverageOperator("PriceSMA", dag, appWindowCountSMA);
-
-    dag.addStream("price", tick.price, quoteOperator.in1, highlow.data, priceSMA.data);
-    dag.addStream("vol", tick.volume, dailyVolume.data, minuteVolume.data);
-    dag.addStream("time", tick.time, quoteOperator.in3);
-    dag.addStream("daily_vol", dailyVolume.sum, quoteOperator.in2);
-
-    dag.addStream("quote_data", quoteOperator.out, getConsole("quoteConsole", dag, "QUOTE"));
-
-    dag.addStream("high_low", highlow.range, chartOperator.in1);
-    dag.addStream("vol_1min", minuteVolume.sum, chartOperator.in2);
-    dag.addStream("chart_data", chartOperator.out, getConsole("chartConsole", dag, "CHART"));
-
-    dag.addStream("sma_price", priceSMA.doubleSMA, getConsole("priceSMAConsole", dag, "Price SMA"));
-
-    return dag;
-  }
-}
-```
-
-
-
-
-### Property File API
-
-The platform also supports specification of a DAG via a property
-file. The aim here to make it easy for tools to create and run an
-application. This method of specification does not have the Java
-compiler support of compile time check, but since these applications
-would be created by software, they should be correct by construction.
-The syntax is derived from Hadoop properties and should be easy for
-folks who are used to creating software that integrated with
-Hadoop.
-
-
-
-Create an application (DAG): myApplication.properties
-
-
-```
-# input operator that reads from a file
-dt.operator.inputOp.classname=com.acme.SampleInputOperator
-dt.operator.inputOp.fileName=somefile.txt
-
-# output operator that writes to the console
-dt.operator.outputOp.classname=com.acme.ConsoleOutputOperator
-
-# stream connecting both operators
-dt.stream.inputStream.source=inputOp.outputPort
-dt.stream.inputStream.sinks=outputOp.inputPort
-```
-
-
-
-Above snippet is intended to convey the basic idea of specifying
-the DAG without using Java. Operators would come from a predefined
-library and referenced in the specification by class name and port names
-(obtained from the library providers documentation or runtime
-introspection by tools). For those interested in details, see later
-sections and refer to the  Operation and
-Installation Guide mentioned above.
-
-### Attributes
-
-Attributes impact the runtime behavior of the application. They do
-not impact the functionality. An example of an attribute is application
-name. Setting it changes the application name. Another example is
-streaming window size. Setting it changes the streaming window size from
-the default value to the specified value. Users cannot add new
-attributes, they can only choose from the ones that come packaged and
-pre-supported by the platform. Details of attributes are covered in the
- Operation and Installation
-Guide.
-
-Operators
-----------------------
-
-Operators are basic compute units.
-Operators process each incoming tuple and emit zero or more tuples on
-output ports as per the business logic. The data flow, connectivity,
-fault tolerance (node outage), etc. is taken care of by the platform. As
-an operator developer, all that is needed is to figure out what to do
-with the incoming tuple and when (and which output port) to send out a
-particular output tuple. Correctly designed operators will most likely
-get reused. Operator design needs care and foresight. For details, refer
-to the  [Operator Developer Guide](operator_development.md). As an application developer you need to connect operators
-in a way that it implements your business logic. You may also require
-operator customization for functionality and use attributes for
-performance/scalability etc.
-
-
-
-All operators process tuples asynchronously in a distributed
-cluster. An operator cannot assume or predict the exact time a tuple
-that it emitted will get consumed by a downstream operator. An operator
-also cannot predict the exact time when a tuple arrives from an upstream
-operator. The only guarantee is that the upstream operators are
-processing the current or a future window, i.e. the windowId of upstream
-operator is equals or exceeds its own windowId. Conversely the windowId
-of a downstream operator is less than or equals its own windowId. The
-end of a window operation, i.e. the API call to endWindow on an operator
-requires that all upstream operators have finished processing this
-window. This means that completion of processing a window propagates in
-a blocking fashion through an operator. Later sections provides more
-details on streams and data flow of tuples.
-
-
-
-Each operator has a unique name within the DAG as provided by the
-user. This is the name of the operator in the logical plan. The name of
-the operator in the physical plan is an integer assigned to it by STRAM.
-These integers are use the sequence from 1 to N, where N is total number
-of physically unique operators in the DAG.  Following the same rule,
-each partitioned instance of a logical operator has its own integer as
-an id. This id along with the Hadoop container name uniquely identifies
-the operator in the execution plan of the DAG. The logical names and the
-physical names are required for web service support. Operators can be
-accessed via both names. These same names are used while interacting
-with  dtcli to access an operator.
-Ideally these names should be self-descriptive. For example in Figure 1,
-the node named “Daily volume” has a physical identifier of 2.
-
-### Operator Interface
-
-Operator interface in a DAG consists of [ports](#h.ihv636)[,](#h.ihv636) [properties](#h.32hioqz)[,](#h.32hioqz) and
- [attributes](#h.41mghml)
-[.](#h.41mghml) Operators interact with other
-components of the DAG via ports. Functional behavior of the operators
-can be customized via parameters. Run time performance and physical
-instantiation is controlled by attributes. Ports and parameters are
-fields (variables) of the Operator class/object, while attributes are
-meta information that is attached to the operator object via an
-AttributeMap. An operator must have at least one port. Properties are
-optional. Attributes are provided by the platform and always have a
-default value that enables normal functioning of operators.
-
-#### Ports
-
-Ports are connection points by which an operator receives and
-emits tuples. These should be transient objects instantiated in the
-operator object, that implement particular interfaces. Ports should be
-transient as they contain no state. They have a pre-defined schema and
-can only be connected to other ports with the same schema. An input port
-needs to implement the interface  Operator.InputPort and
-interface Sink. A default
-implementation of these is provided by the abstract class DefaultInputPort. An output port needs to
-implement the interface  Operator.OutputPort. A default implementation
-of this is provided by the concrete class DefaultOutputPort. These two are a quick way to
-implement the above interfaces, but operator developers have the option
-of providing their own implementations.
-
-
-
-Here are examples of an input and an output port from the operator
-Sum.
-
-
-
-```java
-@InputPortFieldAnnotation(name = "data")
-public final transient DefaultInputPort<V> data = new DefaultInputPort<V>() {
-  @Override
-  public void process(V tuple)
-  {
-  	...
-  }
-}
-@OutputPortFieldAnnotation(optional=true)
-public final transient DefaultOutputPort<V> sum = new DefaultOutputPort<V>(){ … };
-```
-
-
-
-
-The process call is in the Sink interface. An emit on an output
-port is done via emit(tuple) call. For the above example it would be
-sum.emit(t), where the type of t is the generic parameter V.
-
-
-
-There is no limit on how many ports an operator can have. However
-any operator must have at least one port. An operator with only one port
-is called an Input Adapter if it has no input port and an Output Adapter
-if it has no output port. These are special operators needed to get/read
-data from outside system/source into the application, or push/write data
-into an outside system/sink. These could be in Hadoop or outside of
-Hadoop. These two operators are in essence gateways for the streaming
-application to communicate with systems outside the application.
-
-
-
-Port connectivity can be validated during compile time by adding
-PortFieldAnnotations shown above. By default all ports have to be
-connected, to allow a port to go unconnected, you need to add
-“optional=true” to the annotation.
-
-
-
-Attributes can be specified for ports that affect the runtime
-behavior. An example of an attribute is parallel partition that specifes
-a parallel computation flow per partition. It is described in detail in
-the [Parallel
-Partitions](#h.3vac5uf)[ ](#h.3vac5uf)section.
-Another example is queue capacity that specifies the buffer size for the
-port. Details of attributes are covered in  Operation and Installation Guide.
-
-#### Properties
-
-Properties are the abstractions by which functional behavior of an
-operator can be customized. They should be non-transient objects
-instantiated in the operator object. They need to be non-transient since
-they are part of the operator state and re-construction of the operator
-object from its checkpointed state must restore the operator to the
-desired state. Properties are optional, i.e. an operator may or may not
-have properties; they are part of user code and their values are not
-interpreted by the platform in any way.
-
-
-
-All non-serializable objects should be declared transient.
-Examples include sockets, session information, etc. These objects should
-be initialized during setup call, which is called every time the
-operator is initialized.
-
-#### Attributes
-
-Attributes are values assigned to the operators that impact
-run-time. This includes things like the number of partitions, at most
-once or at least once or exactly once recovery modes, etc. Attributes do
-not impact functionality of the operator. Users can change certain
-attributes in runtime. Users cannot add attributes to operators; they
-are pre-defined by the platform. They are interpreted by the platform
-and thus cannot be defined in user created code (like properties).
-Details of attributes are covered in  [Configuration Guide](configuration.md).
-
-### Operator State
-
-The state of an operator is defined as the data that it transfers
-from one window to a future window. Since the computing model of the
-platform is to treat windows like micro-batches, the operator state can
-be [checkpointed](#h.3mzq4wv)[ ](#h.3mzq4wv)every
-Nth window, or every T units of time, where T is significantly greater
-than the streaming window.  When an operator is checkpointed, the entire
-object is written to HDFS.  The larger the amount of state in an
-operator, the longer it takes to recover from a failure. A stateless
-operator can recover much quicker than a stateful one. The needed
-windows are preserved by the upstream buffer server and are used to
-recompute the lost windows, and also rebuild the buffer server in the
-current container.
-
-
-
-The distinction between Stateless and Stateful is based solely on
-the need to transfer data in the operator from one window to the next.
-The state of an operator is independent of the number of ports.
-
-#### Stateless
-
-A Stateless operator is defined as one where no data is needed to
-be kept at the end of every window. This means that all the computations
-of a window can be derived from all the tuples the operator receives
-within that window. This guarantees that the output of any window can be
-reconstructed by simply replaying the tuples that arrived in that
-window. Stateless operators are more efficient in terms of fault
-tolerance, and cost to achieve SLA.
-
-#### Stateful
-
-A Stateful operator is defined as one where data is needed to be
-stored at the end of a window for computations occurring in later
-window; a common example is the computation of a sum of values in the
-input tuples.
-
-### Operator API
-
-The Operator API consists of methods that operator developers may
-need to override. In this section we will discuss the Operator APIs from
-the point of view of an application developer. Knowledge of how an
-operator works internally is critical for writing an application. Those
-interested in the details should refer to  Malhar Operator Developer Guide.
-
-
-
-The APIs are available in three modes, namely Single Streaming
-Window, Sliding Application Window, and Aggregate Application Window.
-These are not mutually exclusive, i.e. an operator can use single
-streaming window as well as sliding application window. A physical
-instance of an operator is always processing tuples from a single
-window. The processing of tuples is guaranteed to be sequential, no
-matter which input port the tuples arrive on.
-
-
-
-In the later part of this section we will evaluate three common
-uses of streaming windows by applications. They have different
-characteristics and implications on optimization and recovery mechanisms
-(i.e. algorithm used to recover a node after outage) as discussed later
-in the section.
-
-#### Streaming Window
-
-Streaming window is atomic micro-batch computation period. The API
-methods relating to a streaming window are as follows
-
-
-
-```java
-public void process(<tuple_type> tuple) // Called on the input port on which the tuple arrives
-public void beginWindow(long windowId) // Called at the start of the window as soon as the first begin_window tuple arrives
-public void endWindow() // Called at the end of the window after end_window tuples arrive on all input ports
-public void setup(OperatorContext context) // Called once during initialization of the operator
-public void teardown() // Called once when the operator is being shutdown
-```
-
-
-A tuple can be emitted in any of the three streaming run-time
-calls, namely beginWindow, process, and endWindow but not in setup or
-teardown.
-
-#### Aggregate Application Window
-
-An operator with an aggregate window is stateful within the
-application window timeframe and possibly stateless at the end of that
-application window. An size of an aggregate application window is an
-operator attribute and is defined as a multiple of the streaming window
-size. The platform recognizes this attribute and optimizes the operator.
-The beginWindow, and endWindow calls are not invoked for those streaming
-windows that do not align with the application window. For example in
-case of streaming window of 0.5 second and application window of 5
-minute, an application window spans 600 streaming windows (5\*60\*2 =
-600). At the start of the sequence of these 600 atomic streaming
-windows, a beginWindow gets invoked, and at the end of these 600
-streaming windows an endWindow gets invoked. All the intermediate
-streaming windows do not invoke beginWindow or endWindow. Bookkeeping,
-node recovery, stats, UI, etc. continue to work off streaming windows.
-For example if operators are being checkpointed say on an average every
-30th window, then the above application window would have about 20
-checkpoints.
-
-#### Sliding Application Window
-
-A sliding window is computations that requires previous N
-streaming windows. After each streaming window the Nth past window is
-dropped and the new window is added to the computation. An operator with
-sliding window is a stateful operator at end of any window. The sliding
-window period is an attribute and is a multiple of streaming window. The
-platform recognizes this attribute and leverages it during bookkeeping.
-A sliding aggregate window with tolerance to data loss does not have a
-very high bookkeeping cost. The cost of all three recovery mechanisms,
- at most once (data loss tolerant),
-at least once (data loss
-intolerant), and exactly once (data
-loss intolerant and no extra computations) is same as recovery
-mechanisms based on streaming window. STRAM is not able to leverage this
-operator for any extra optimization.
-
-### Single vs Multi-Input Operator
-
-A single-input operator by definition has a single upstream
-operator, since there can only be one writing port for a stream.  If an
-operator has a single upstream operator, then the beginWindow on the
-upstream also blocks the beginWindow of the single-input operator. For
-an operator to start processing any window at least one upstream
-operator has to start processing that window. A multi-input operator
-reads from more than one upstream ports. Such an operator would start
-processing as soon as the first begin_window event arrives. However the
-window would not close (i.e. invoke endWindow) till all ports receive
-end_window events for that windowId. Thus the end of a window is a
-blocking event. As we saw earlier, a multi-input operator is also the
-point in the DAG where windows of all upstream operators are
-synchronized. The windows (atomic micro-batches) from a faster (or just
-ahead in processing) upstream operators are queued up till the slower
-upstream operator catches up. STRAM monitors such bottlenecks and takes
-corrective actions. The platform ensures minimal delay, i.e processing
-starts as long as at least one upstream operator has started
-processing.
-
-### Recovery Mechanisms
-
-Application developers can set any of the recovery mechanisms
-below to deal with node outage. In general, the cost of recovery depends
-on the state of the operator, while data integrity is dependant on the
-application. The mechanisms are per window as the platform treats
-windows as atomic compute units. Three recovery mechanisms are
-supported, namely
-
--   At-least-once: All atomic batches are processed at least once.
-    No data loss occurs.
--   At-most-once: All atomic batches are processed at most once.
-    Data loss is possible; this is the most efficient setting.
--   Exactly-once: All atomic batches are processed exactly once.
-    No data loss occurs; this is the least efficient setting since
-    additional work is needed to ensure proper semantics.
-
-At-least-once is the default. During a recovery event, the
-operator connects to the upstream buffer server and asks for windows to
-be replayed. At-least-once and exactly-once mechanisms start from its
-checkpointed state. At-most-once starts from the next begin-window
-event.
-
-
-
-Recovery mechanisms can be specified per Operator while writing
-the application as shown below.
-
-
-```java
-Operator o = dag.addOperator(“operator”, …);
-dag.setAttribute(o,  OperatorContext.PROCESSING_MODE,  ProcessingMode.AT_MOST_ONCE);
-```
-
-
-Also note that once an operator is attributed to AT_MOST_ONCE,
-all the operators downstream to it have to be AT_MOST_ONCE. The client
-will give appropriate warnings or errors if that’s not the case.
-
-
-
-Details are explained in the chapter on Fault Tolerance
-below[.](#h.2nusc19)
-
-Streams
---------------------
-
-A stream is a connector
-(edge) abstraction, and is a fundamental building block of the platform.
-A stream consists of tuples that flow from one port (called the
-output port) to one or more ports
-on other operators (called  input ports) another -- so note a potentially
-confusing aspect of this terminology: tuples enter a stream through its
-output port and leave via one or more input ports. A stream has the
-following characteristics
-
--   Tuples are always delivered in the same order in which they
-    were emitted.
--   Consists of a sequence of windows one after another. Each
-    window being a collection of in-order tuples.
--   A stream that connects two containers passes through a
-    buffer server.
--   All streams can be persisted (by default in HDFS).
--   Exactly one output port writes to the stream.
--   Can be read by one or more input ports.
--   Connects operators within an application, not outside
-    an application.
--   Has an unique name within an application.
--   Has attributes which act as hints to STRAM.
--   Streams have four modes, namely in-line, in-node, in-rack,
-    and other. Modes may be overruled (for example due to lack
-    of containers). They are defined as follows:
-
-    -   THREAD_LOCAL: In the same thread, uses thread
-        stack (intra-thread). This mode can only be used for a downstream
-        operator which has only one input port connected; also called
-        in-line.
-    -   CONTAINER_LOCAL: In the same container (intra-process); also
-        called in-container.
-    -   NODE_LOCAL: In the same Hadoop node (inter processes, skips
-        NIC); also called in-node.
-    -   RACK_LOCAL: On nodes in the same rack; also called
-        in-rack.
-    -   unspecified: No guarantee. Could be anywhere within the
-        cluster
-
-
-
-An example of a stream declaration is given below
-
-
-
-```java
-DAG dag = new DAG();
- …
-dag.addStream("views", viewAggregate.sum, cost.data).setLocality(CONTAINER_LOCAL); // A container local  stream
-dag.addStream(“clicks”, clickAggregate.sum, rev.data); // An example of unspecified locality
-```
-
-
-The platform guarantees in-order delivery of tuples in a stream.
-STRAM views each stream as collection of ordered windows. Since no tuple
-can exist outside a window, a replay of a stream consists of replay of a
-set of windows. When multiple input ports read the same stream, the
-execution plan of a stream ensures that each input port is logically not
-blocked by the reading of another input port. The schema of a stream is
-same as the schema of the tuple.
-
-
-
-In a stream all tuples emitted by an operator in a window belong
-to that window. A replay of this window would consists of an in-order
-replay of all the tuples. Thus the tuple order within a stream is
-guaranteed. However since an operator may receive multiple streams (for
-example an operator with two input ports), the order of arrival of two
-tuples belonging to different streams is not guaranteed. In general in
-an asynchronous distributed architecture this is expected. Thus the
-operator (specially one with multiple input ports) should not depend on
-the tuple order from two streams. One way to cope with this
-indeterminate order, if necessary, is to wait to get all the tuples of a
-window and emit results in endWindow call. All operator templates
-provided as part of  [standard operator template
-library](#h.3ep43zb) [ ](#h.3ep43zb)follow
-these principles.
-
-
-
-A logical stream gets partitioned into physical streams each
-connecting the partition to the upstream operator. If two different
-attributes are needed on the same stream, it should be split using
-StreamDuplicator operator.
-
-
-
-Modes of the streams are critical for performance. An in-line
-stream is the most optimal as it simply delivers the tuple as-is without
-serialization-deserialization. Streams should be marked
-container_local, specially in case where there is a large tuple volume
-between two operators which then on drops significantly. Since the
-setLocality call merely provides a hint, STRAM may ignore it. An In-node
-stream is not as efficient as an in-line one, but it is clearly better
-than going off-node since it still avoids the potential bottleneck of
-the network card.
-
-
-
-THREAD_LOCAL and CONTAINER_LOCAL streams do not use a buffer
-server as this stream is in a single process. The other two do.
-
-Validating an Application
---------------------------------------
-
-The platform provides various ways of validating the application
-specification and data input. An understanding of these checks is very
-important for an application developer since it affects productivity.
-Validation of an application is done in three phases, namely
-
-
-
-1.  Compile Time: Caught during application development, and is
-    most cost effective. These checks are mainly done on declarative
-    objects and leverages the Java compiler. An example is checking that
-    the schemas specified on all ports of a stream are
-    mutually compatible.
-2.  Initialization Time: When the application is being
-    initialized, before submitting to Hadoop. These checks are related
-    to configuration/context of an application, and are done by the
-    logical DAG builder implementation. An example is the checking that
-    all non-optional ports are connected to other ports.
-3.  Run Time: Validations done when the application is running.
-    This is the costliest of all checks. These are checks that can only
-    be done at runtime as they involve data. For example divide by 0
-    check as part of business logic.
-
-### Compile Time
-
-Compile time validations apply when an application is specified in
-Java code and include all checks that can be done by Java compiler in
-the development environment (including IDEs like NetBeans or Eclipse).
-Examples include
-
-1.  Schema Validation: The tuples on ports are POJO (plain old
-    java objects) and compiler checks to ensure that all the ports on a
-    stream have the same schema.
-2.  Stream Check: Single Output Port and at least one Input port
-    per stream. A stream can only have one output port writer. This is
-    part of the addStream api. This
-    check ensures that developers only connect one output port to
-    a stream. The same signature also ensures that there is at least one
-    input port for a stream
-3.  Naming: Compile time checks ensures that applications
-    components operators, streams are named
-
-### Initialization/Instantiation Time
-
-Initialization time validations include various checks that are
-done post compile, and before the application starts running in a
-cluster (or local mode). These are mainly configuration/contextual in
-nature. These checks are as critical to proper functionality of the
-application as the compile time validations.
-
-
-
-Examples include
-
--   [JavaBeans Validation](http://docs.oracle.com/javaee/6/tutorial/doc/gircz.html):
-    Examples include
-
-    -   @Max(): Value must be less than or equal to the number
-    -   @Min(): Value must be greater than or equal to the
-        number
-    -   @NotNull: The value of the field or property must not be
-        null
-    -   @Pattern(regexp = “....”): Value must match the regular
-        expression
-    -   Input port connectivity: By default, every non-optional input
-        port must be connected. A port can be declared optional by using an
-        annotation:     @InputPortFieldAnnotation(name = "...", optional
-        = true)
-    -   Output Port Connectivity: Similar. The annotation here is:    
-        @OutputPortFieldAnnotation(name = "...", optional = true)
-
--   Unique names in application scope: Operators, streams, must have
-    unique names.
--   Cycles in the dag: DAG cannot have a cycle.
--   Unique names in operator scope: Ports, properties, annotations
-    must have unique names.
--   One stream per port: A port can connect to only one stream.
-    This check applies to input as well as output ports even though an
-    output port can technically write to two streams. If you must have
-    two streams originating from a single output port, use  a streamDuplicator operator.
--   Application Window Period: Has to be an integral multiple the
-    streaming window period.
-
-### Run Time
-
-Run time checks are those that are done when the application is
-running. The real-time streaming platform provides rich run time error
-handling mechanisms. The checks are exclusively done by the application
-business logic, but the platform allows applications to count and audit
-these. Some of these features are in the process of development (backend
-and UI) and this section will be updated as they are developed. Upon
-completion examples will be added to  [demos](#h.upglbi) [t](#h.upglbi)o
-illustrate these.
-
-
-
-Error ports are output ports with error annotations. Since they
-are normal ports, they can be monitored and tuples counted, persisted
-and counts shown in the UI.
-
-------------------------------------------------------------------------
-
-
-
-
-
-Multi-Tenancy and Security
-=======================================
-
-Hadoop is a multi-tenant distributed operating system. Security is
-an intrinsic element of multi-tenancy as without it a cluster cannot be
-reasonably be shared among enterprise applications. Streaming
-applications follow all multi-tenancy security models used in Hadoop as
-they are native Hadoop applications. For details refer to the
-[Operation and Installation
-Guide](https://www.datatorrent.com/docs/guides/OperationandInstallationGuide.html)
-.
-
-Security
----------------------
-
-The platform includes Kerberos support. Both access points, namely
-STRAM and Bufferserver are secure. STRAM passes the token over to
-StreamingContainer, which then gives it to the Bufferserver. The most
-important aspect for an application developer is to note that STRAM is
-the single point of access to ensure security measures are taken by all
-components of the platform.
-
-Resource Limits
-----------------------------
-
-Hadoop enforces quotas on resources. This includes hard-disk (name
-space and total disk quota) as well as priority queues for schedulers.
-The platform uses Hadoop resource limits to manage a streaming
-application. In addition network I/O quotas can be enforced. An operator
-can be dynamically partitioned if it reaches its resource limits; these
-limits may be expressed in terms of throughput, latency, or just
-aggregate resource utilization of a container.
-
-
-
-
-
-------------------------------------------------------------------------
-
-
-
-
-
-Scalability and Partitioning
-=========================================
-
-Scalability is a foundational element of this platform and is a
-building block for an eco-system where big-data meets real-time.
-Enterprises need to continually meet SLA as data grows. Without the
-ability to scale as load grows, or new applications with higher loads
-come to fruition, enterprise grade SLA cannot be met. A big issue with
-the streaming application space is that, it is not just about high load,
-but also the fluctuations in it. There is no way to guarantee future
-load requirements and there is a big difference between high and low
-load within a day for the same feed. Traditional streaming platforms
-solve these two cases by simply throwing more hardware at the
-problem.
-
-
-
-Daily spikes are managed by ensuring enough hardware for peak
-load, which then idles during low load, and future needs are handled by
-a very costly re-architecture, or investing heavily in building a
-scalable distributed operating system. Another salient and often
-overlooked cost is the need to manage SLA -- let’s call it  buffer capacity. Since this means computing the
-peak load within required time, that translates to allocating enough
-resources over and above peak load as daily peaks fluctuate. For example
-an average peak load of 100 resource units (cpu and/or memory and/or
-network) may mean allocating about 200 resource units to be safe. A
-distributed cluster that cannot dynamically scale up and down, in effect
-pays buffer capacity per application. Another big aspect of streaming
-applications is that the load is not just ingestion rate, more often
-than not, the internal operators produce lot more events than the
-ingestion rate. For example a dimensional data (with, say  d dimensions) computation needs 2\*d -1 computations per ingested event. A lot
-of applications have over 10 dimensions, i.e over 1000 computations per
-incoming event and these need to be distributed across the cluster,
-thereby causing an explosion in the throughput (events/sec) that needs
-to be managed.
-
-
-
-The platform is designed to handle such cases at a very low cost.
-The platform scales linearly with Hadoop. If applications need more
-resources, the enterprise can simply add more commodity nodes to Hadoop
-without any downtime, and the Hadoop native platform will take care of
-the rest. If some nodes go bad, these can be removed without downtime.
-The daily peaks and valleys in the load are managed by the platform by
-dynamically scaling at the peak and then giving the resources back to
-Hadoop during low load. This means that a properly designed Hadoop
-cluster does several things for enterprises: (a) reduces the cost of
-hardware due to use of commodity hardware (b) shares buffer capacity
-across all applications as peaks of all applications may not align and
-(c) raises the average CPU usage on a 24x7 basis. As a general design
-this is similar to scale that a map-reduce application can deliver. In
-the following sections of this chapter we will see how this is
-done.
-
-Partitioning
--------------------------
-
-If all tuples sent through the stream(s) that are connected to the
-input port(s) of an operator in the DAG are received by a single
-physical instance of that operator, that operator can become a
-performance bottleneck. This leads to scalability issues when
-throughput, memory, or CPU needs exceed the processing capacity of that
-single instance.
-
-
-
-To address the problem, the platform offers the capability to
-partition the inflow of data so that it is divided across multiple
-physical instances of a logical operator in the DAG. There are two
-functional ways to partition
-
--   Load balance: Incoming load is simply partitioned
-    into stream(s) that go to separate instances of physical operators
-    and scalability is achieved via adding more physical operators. Each
-    tuple is sent to physical operator (partition) based on a
-    round-robin or other similar algorithm. This scheme scales linearly.
-    A lot of key based computations can load balance in the platform due
-    to the ability to insert  Unifiers. For many computations, the
-    endWindow and Unifier setup is similar to the combiner and reducer
-    mechanism in a Map-Reduce computation.
--   Sticky Key: The key assertion is that distribution of tuples
-    are sticky, i.e the data with
-    same key will always be processed by the same physical operator, no
-    matter how many times it is sent through the stream. This stickiness
-    will continue even if the number of partitions grows dynamically and
-    can eventually be leveraged for advanced features like
-    bucket testing. How this is accomplished and what is required to
-    develop compliant operators will be explained below.
-
-
-
-We plan to add more partitioning mechanisms proactively to the
-platform over time as needed by emerging usage patterns. The aim is to
-allow enterprises to be able to focus on their business logic, and
-significantly reduce the cost of operability. As an enabling technology
-for managing high loads, this platform provides enterprises with a
-significant innovative edge. Scalability and Partitioning is a
-foundational building block for this platform.
-
-### Sticky Partition vs Round Robin
-
-As noted above, partitioning via sticky key is data aware but
-round-robin partitioning is not. An example for non-sticky load
-balancing would be round robin distribution over multiple instances,
-where for example a tuple stream of  A, A,
-A with 3 physical operator
-instances would result in processing of a single A by each of the instances, In contrast, sticky
-partitioning means that exactly one instance of the operators will
-process all of the  Atuples if they
-fall into the same bucket, while B
-may be processed by another operator. Data aware mapping of
-tuples to partitions (similar to distributed hash table) is accomplished
-via Stream Codecs. In later sections we would show how these two
-approaches can be used in combination.
-
-### Stream Codec
-
-The platform does not make assumptions about the tuple
-type, it could be any Java object. The operator developer knows what
-tuple type an input port expects and is capable of processing. Each
-input port has a stream codec  associated thatdefines how data is serialized when transmitted over a socket
-stream; it also defines another
-function that computes the partition hash key for the tuple. The engine
-uses that key to determine which physical instance(s)  (for a
-partitioned operator) receive that  tuple. For this to work, consistent hashing is required.
-The default codec uses the Java Object\#hashCode function, which is
-sufficient for basic types such as Integer, String etc. It will also
-work with custom tuple classes as long as they implement hashCode
-appropriately. Reliance on hashCode may not work when generic containers
-are used that do not hash the actual data, such as standard collection
-classes (HashMap etc.), in which case a custom stream codec must be
-assigned to the input port.
-
-### Static Partitioning
-
-DAG designers can specify at design time how they would like
-certain operators to be partitioned. STRAM then instantiates the DAG
-with the physical plan which adheres to the partitioning scheme defined
-by the design. This plan is the initial partition of the application. In
-other words, Static Partitioning is used to tell STRAM to compute the
-physical DAG from a logical DAG once, without taking into consideration
-runtime states or loads of various operators.
-
-### Dynamic Partitioning
-
-In streaming applications the load changes during the day, thus
-creating situations where the number of partitioned operator instances
-needs to adjust dynamically. The load can be measured in terms of
-processing within the DAG based on throughput, or latency, or
-considerations in external system components (time based etc.) that the
-platform may not be aware of. Whatever the trigger, the resource
-requirement for the current processing needs to be adjusted at run-time.
-The platform may detect that operator instances are over or under
-utilized and may need to dynamically adjust the number of instances on
-the fly. More instances of a logical operator may be required (partition
-split) or underutilized operator instances may need decommissioning
-(partition merge). We refer to either of the changes as dynamic
-partitioning. The default partitioning scheme supports split and merge
-of partitions, but 

<TRUNCATED>