You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sa...@apache.org on 2016/03/05 02:43:15 UTC

[13/23] incubator-apex-site git commit: from 799499c27927d394756434e945376df6c918afd6

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/42a0b9cb/content/docs/apex-3.0/mkdocs/search_index.json
----------------------------------------------------------------------
diff --git a/content/docs/apex-3.0/mkdocs/search_index.json b/content/docs/apex-3.0/mkdocs/search_index.json
new file mode 100644
index 0000000..85c9b89
--- /dev/null
+++ b/content/docs/apex-3.0/mkdocs/search_index.json
@@ -0,0 +1,429 @@
+{
+    "docs": [
+        {
+            "location": "/", 
+            "text": "Apache Apex (Incubating)\n\n\nApex is a Hadoop YARN native big data processing platform, enabling real time stream as well as batch processing for your big data.  Apex provides the following benefits:\n\n\n\n\nHigh scalability and performance\n\n\nFault tolerance and state management\n\n\nHadoop-native YARN \n HDFS implementation\n\n\nEvent processing guarantees\n\n\nSeparation of functional and operational concerns\n\n\nSimple API supports generic Java code\n\n\n\n\nPlatform has been demonstated to scale linearly across Hadoop clusters under extreme loads of billions of events per second.  Hardware and process failures are quickly recovered with HDFS-backed checkpointing and automatic operator recovery, preserving application state and resuming execution in seconds.  Functional and operational specifications are separated.  Apex provides a simple API, which enables users to write generic, reusable code.  The code is dropped in as-is and platform automatically h
 andles the various operational concerns, such as state management, fault tolerance, scalability, security, metrics, etc.  This frees users to focus on functional development, and lets platform provide operability support.\n\n\nThe core Apex platform is supplemented by Malhar, a library of connector and logic functions, enabling rapid application development.  These operators and modules provide access to HDFS, S3, NFS, FTP, and other file systems; Kafka, ActiveMQ, RabbitMQ, JMS, and other message systems; MySql, Cassandra, MongoDB, Redis, HBase, CouchDB, generic JDBC, and other database connectors.  In addition to the operators, the library contains a number of demos applications, demonstrating operator features and capabilities.  To see the full list of available operators and related documentation, visit \nApex Malhar on Github\n\n\nFor additional information visit \nApache Apex (incubating)\n.", 
+            "title": "Apache Apex"
+        }, 
+        {
+            "location": "/#apache-apex-incubating", 
+            "text": "Apex is a Hadoop YARN native big data processing platform, enabling real time stream as well as batch processing for your big data.  Apex provides the following benefits:   High scalability and performance  Fault tolerance and state management  Hadoop-native YARN   HDFS implementation  Event processing guarantees  Separation of functional and operational concerns  Simple API supports generic Java code   Platform has been demonstated to scale linearly across Hadoop clusters under extreme loads of billions of events per second.  Hardware and process failures are quickly recovered with HDFS-backed checkpointing and automatic operator recovery, preserving application state and resuming execution in seconds.  Functional and operational specifications are separated.  Apex provides a simple API, which enables users to write generic, reusable code.  The code is dropped in as-is and platform automatically handles the various operational concerns, such as state management
 , fault tolerance, scalability, security, metrics, etc.  This frees users to focus on functional development, and lets platform provide operability support.  The core Apex platform is supplemented by Malhar, a library of connector and logic functions, enabling rapid application development.  These operators and modules provide access to HDFS, S3, NFS, FTP, and other file systems; Kafka, ActiveMQ, RabbitMQ, JMS, and other message systems; MySql, Cassandra, MongoDB, Redis, HBase, CouchDB, generic JDBC, and other database connectors.  In addition to the operators, the library contains a number of demos applications, demonstrating operator features and capabilities.  To see the full list of available operators and related documentation, visit  Apex Malhar on Github  For additional information visit  Apache Apex (incubating) .", 
+            "title": "Apache Apex (Incubating)"
+        }, 
+        {
+            "location": "/apex_development_setup/", 
+            "text": "Apache Apex Development Environment Setup\n\n\nThis document discusses the steps needed for setting up a development environment for creating applications that run on the Apache Apex platform.\n\n\nDevelopment Tools\n\n\nThere are a few tools that will be helpful when developing Apache Apex applications, including:\n\n\n\n\n\n\ngit\n - A revision control system (version 1.7.1 or later). There are multiple git clients available for Windows (\nhttp://git-scm.com/download/win\n for example), so download and install a client of your choice.\n\n\n\n\n\n\njava JDK\n (not JRE) - Includes the Java Runtime Environment as well as the Java compiler and a variety of tools (version 1.7.0_79 or later). Can be downloaded from the Oracle website.\n\n\n\n\n\n\nmaven\n - Apache Maven is a build system for Java projects (version 3.0.5 or later). It can be downloaded from \nhttps://maven.apache.org/download.cgi\n.\n\n\n\n\n\n\nIDE\n (Optional) - If you prefer to use an IDE (Integra
 ted Development Environment) such as \nNetBeans\n, \nEclipse\n or \nIntelliJ\n, install that as well.\n\n\n\n\n\n\nAfter installing these tools, make sure that the directories containing the executable files are in your PATH environment variable.\n\n\n\n\nWindows\n - Open a console window and enter the command \necho %PATH%\n to see the value of the \nPATH\n variable and verify that the above directories for Java, git, and maven executables are present.  JDK executables like \njava\n and \njavac\n, the directory might be something like \nC:\\\\Program Files\\\\Java\\\\jdk1.7.0\\_80\\\\bin\n; for \ngit\n it might be \nC:\\\\Program Files\\\\Git\\\\bin\n; and for maven it might be \nC:\\\\Users\\\\user\\\\Software\\\\apache-maven-3.3.3\\\\bin\n.  If not, you can change its value clicking on the button at \nControl Panel\n \n \nAdvanced System Settings\n \n \nAdvanced tab\n \n \nEnvironment Variables\n.\n\n\nLinux and Mac\n - Open a console/terminal window and enter the command \necho 
 $PATH\n to see the value of the \nPATH\n variable and verify that the above directories for Java, git, and maven executables are present.  If not, make sure software is downloaded and installed, and optionally PATH reference is added and exported  in a \n~/.profile\n or \n~/.bash_profile\n.  For example to add maven located in \n/sfw/maven/apache-maven-3.3.3\n to PATH add the line: \nexport PATH=$PATH:/sfw/maven/apache-maven-3.3.3/bin\n\n\n\n\nConfirm by running the following commands and comparing with output that show in the table below:\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nCommand\n\n\nOutput\n\n\n\n\n\n\njavac -version\n\n\njavac 1.7.0_80\n\n\n\n\n\n\njava -version\n\n\njava version \n1.7.0_80\n\n\nJava(TM) SE Runtime Environment (build 1.7.0_80-b15)\n\n\nJava HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)\n\n\n\n\n\n\ngit --version\n\n\ngit version 2.6.1.windows.1\n\n\n\n\n\n\nmvn --version\n\n\nApache Maven 3.3.3 (7994120775791599e205a5524ec3e0dfe41d4a06; 2015-04-22T06
 :57:37-05:00)\n\n\n...\n\n\n\n\n\n\n\n\n\n\n\nCreating New Apex Project\n\n\nAfter development tools are configured, you can now use the maven archetype to create a basic Apache Apex project.  \nNote:\n When executing the commands below, replace \n3.3.0-incubating\n by \nlatest available version\n of Apache Apex.\n\n\n\n\n\n\nWindows\n - Create a new Windows command file called \nnewapp.cmd\n by copying the lines below, and execute it.  When you run this file, the properties will be displayed and you will be prompted with \nY: :\n; just press \nEnter\n to complete the project generation.  The caret (^) at the end of some lines indicates that a continuation line follows. \n\n\n@echo off\n@rem Script for creating a new application\nsetlocal\nmvn archetype:generate ^\n -DarchetypeGroupId=org.apache.apex ^\n -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.3.0-incubating ^\n -DgroupId=com.example -Dpackage=com.example.myapexapp -DartifactId=myapexapp ^\n -Dversion=1.0-SNAPS
 HOT\nendlocal\n\n\n\n\n\n\n\nLinux\n - Execute the lines below in a terminal window.  New project will be created in the curent working directory.  The backslash (\\) at the end of the lines indicates continuation.\n\n\nmvn archetype:generate \\\n -DarchetypeGroupId=org.apache.apex \\\n -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.2.0-incubating \\\n -DgroupId=com.example -Dpackage=com.example.myapexapp -DartifactId=myapexapp \\\n -Dversion=1.0-SNAPSHOT\n\n\n\n\n\n\n\nWhen the run completes successfully, you should see a new directory named \nmyapexapp\n containing a maven project for building a basic Apache Apex application. It includes 3 source files:\nApplication.java\n,  \nRandomNumberGenerator.java\n and \nApplicationTest.java\n. You can now build the application by stepping into the new directory and running the maven package command:\n\n\ncd myapexapp\nmvn clean package -DskipTests\n\n\n\nThe build should create the application package file \nmyapexapp/target
 /myapexapp-1.0-SNAPSHOT.apa\n. This application package can then be used to launch example application via \ndtCli\n, or other visual management tools.  When running, this application will generate a stream of random numbers and print them out, each prefixed by the string \nhello world:\n.\n\n\nBuilding Apex Demos\n\n\nIf you want to see more substantial Apex demo applications and the associated source code, you can follow these simple steps to check out and build them.\n\n\n\n\n\n\nCheck out the source code repositories:\n\n\ngit clone https://github.com/apache/incubator-apex-core\ngit clone https://github.com/apache/incubator-apex-malhar\n\n\n\n\n\n\n\nSwitch to the appropriate release branch and build each repository:\n\n\ncd incubator-apex-core\nmvn clean install -DskipTests\n\ncd incubator-apex-malhar\nmvn clean install -DskipTests\n\n\n\n\n\n\n\nThe \ninstall\n argument to the \nmvn\n command installs resources from each project to your local maven repository (typically \n.m2/
 repository\n under your home directory), and \nnot\n to the system directories, so Administrator privileges are not required. The  \n-DskipTests\n argument skips running unit tests since they take a long time. If this is a first-time installation, it might take several minutes to complete because maven will download a number of associated plugins.\n\n\nAfter the build completes, you should see the demo application package files in the target directory under each demo subdirectory in \nincubator-apex-malhar/demos\n.\n\n\nSandbox\n\n\nTo jump start development with an Apache Hadoop single node cluster, \nDataTorrent Sandbox\n powered by VirtualBox is available on Windows, Linux, or Mac platforms.  The sandbox is configured by default to run with 6GB RAM; if your development machine has 16GB or more, you can increase the sandbox RAM to 8GB or more using the VirtualBox console.  This will yield better performance and support larger applications.  The advantage of developing in the sandb
 ox is that most of the tools (e.g. \njdk\n, \ngit\n, \nmaven\n), Hadoop YARN and HDFS, and a distribution of Apache Apex and DataTorrent RTS are pre-installed.  The disadvantage is that the sandbox is a memory-limited environment, and requires settings changes and restarts to adjust memory available for development and testing.", 
+            "title": "Development Setup"
+        }, 
+        {
+            "location": "/apex_development_setup/#apache-apex-development-environment-setup", 
+            "text": "This document discusses the steps needed for setting up a development environment for creating applications that run on the Apache Apex platform.", 
+            "title": "Apache Apex Development Environment Setup"
+        }, 
+        {
+            "location": "/apex_development_setup/#development-tools", 
+            "text": "There are a few tools that will be helpful when developing Apache Apex applications, including:    git  - A revision control system (version 1.7.1 or later). There are multiple git clients available for Windows ( http://git-scm.com/download/win  for example), so download and install a client of your choice.    java JDK  (not JRE) - Includes the Java Runtime Environment as well as the Java compiler and a variety of tools (version 1.7.0_79 or later). Can be downloaded from the Oracle website.    maven  - Apache Maven is a build system for Java projects (version 3.0.5 or later). It can be downloaded from  https://maven.apache.org/download.cgi .    IDE  (Optional) - If you prefer to use an IDE (Integrated Development Environment) such as  NetBeans ,  Eclipse  or  IntelliJ , install that as well.    After installing these tools, make sure that the directories containing the executable files are in your PATH environment variable.   Windows  - Open a console window and
  enter the command  echo %PATH%  to see the value of the  PATH  variable and verify that the above directories for Java, git, and maven executables are present.  JDK executables like  java  and  javac , the directory might be something like  C:\\\\Program Files\\\\Java\\\\jdk1.7.0\\_80\\\\bin ; for  git  it might be  C:\\\\Program Files\\\\Git\\\\bin ; and for maven it might be  C:\\\\Users\\\\user\\\\Software\\\\apache-maven-3.3.3\\\\bin .  If not, you can change its value clicking on the button at  Control Panel     Advanced System Settings     Advanced tab     Environment Variables .  Linux and Mac  - Open a console/terminal window and enter the command  echo $PATH  to see the value of the  PATH  variable and verify that the above directories for Java, git, and maven executables are present.  If not, make sure software is downloaded and installed, and optionally PATH reference is added and exported  in a  ~/.profile  or  ~/.bash_profile .  For example to add maven located in  /sf
 w/maven/apache-maven-3.3.3  to PATH add the line:  export PATH=$PATH:/sfw/maven/apache-maven-3.3.3/bin   Confirm by running the following commands and comparing with output that show in the table below:         Command  Output    javac -version  javac 1.7.0_80    java -version  java version  1.7.0_80  Java(TM) SE Runtime Environment (build 1.7.0_80-b15)  Java HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)    git --version  git version 2.6.1.windows.1    mvn --version  Apache Maven 3.3.3 (7994120775791599e205a5524ec3e0dfe41d4a06; 2015-04-22T06:57:37-05:00)  ...", 
+            "title": "Development Tools"
+        }, 
+        {
+            "location": "/apex_development_setup/#creating-new-apex-project", 
+            "text": "After development tools are configured, you can now use the maven archetype to create a basic Apache Apex project.   Note:  When executing the commands below, replace  3.3.0-incubating  by  latest available version  of Apache Apex.    Windows  - Create a new Windows command file called  newapp.cmd  by copying the lines below, and execute it.  When you run this file, the properties will be displayed and you will be prompted with  Y: : ; just press  Enter  to complete the project generation.  The caret (^) at the end of some lines indicates that a continuation line follows.   @echo off\n@rem Script for creating a new application\nsetlocal\nmvn archetype:generate ^\n -DarchetypeGroupId=org.apache.apex ^\n -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.3.0-incubating ^\n -DgroupId=com.example -Dpackage=com.example.myapexapp -DartifactId=myapexapp ^\n -Dversion=1.0-SNAPSHOT\nendlocal    Linux  - Execute the lines below in a terminal window.  New projec
 t will be created in the curent working directory.  The backslash (\\) at the end of the lines indicates continuation.  mvn archetype:generate \\\n -DarchetypeGroupId=org.apache.apex \\\n -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.2.0-incubating \\\n -DgroupId=com.example -Dpackage=com.example.myapexapp -DartifactId=myapexapp \\\n -Dversion=1.0-SNAPSHOT    When the run completes successfully, you should see a new directory named  myapexapp  containing a maven project for building a basic Apache Apex application. It includes 3 source files: Application.java ,   RandomNumberGenerator.java  and  ApplicationTest.java . You can now build the application by stepping into the new directory and running the maven package command:  cd myapexapp\nmvn clean package -DskipTests  The build should create the application package file  myapexapp/target/myapexapp-1.0-SNAPSHOT.apa . This application package can then be used to launch example application via  dtCli , or other visual 
 management tools.  When running, this application will generate a stream of random numbers and print them out, each prefixed by the string  hello world: .", 
+            "title": "Creating New Apex Project"
+        }, 
+        {
+            "location": "/apex_development_setup/#building-apex-demos", 
+            "text": "If you want to see more substantial Apex demo applications and the associated source code, you can follow these simple steps to check out and build them.    Check out the source code repositories:  git clone https://github.com/apache/incubator-apex-core\ngit clone https://github.com/apache/incubator-apex-malhar    Switch to the appropriate release branch and build each repository:  cd incubator-apex-core\nmvn clean install -DskipTests\n\ncd incubator-apex-malhar\nmvn clean install -DskipTests    The  install  argument to the  mvn  command installs resources from each project to your local maven repository (typically  .m2/repository  under your home directory), and  not  to the system directories, so Administrator privileges are not required. The   -DskipTests  argument skips running unit tests since they take a long time. If this is a first-time installation, it might take several minutes to complete because maven will download a number of associated plugins.  A
 fter the build completes, you should see the demo application package files in the target directory under each demo subdirectory in  incubator-apex-malhar/demos .", 
+            "title": "Building Apex Demos"
+        }, 
+        {
+            "location": "/apex_development_setup/#sandbox", 
+            "text": "To jump start development with an Apache Hadoop single node cluster,  DataTorrent Sandbox  powered by VirtualBox is available on Windows, Linux, or Mac platforms.  The sandbox is configured by default to run with 6GB RAM; if your development machine has 16GB or more, you can increase the sandbox RAM to 8GB or more using the VirtualBox console.  This will yield better performance and support larger applications.  The advantage of developing in the sandbox is that most of the tools (e.g.  jdk ,  git ,  maven ), Hadoop YARN and HDFS, and a distribution of Apache Apex and DataTorrent RTS are pre-installed.  The disadvantage is that the sandbox is a memory-limited environment, and requires settings changes and restarts to adjust memory available for development and testing.", 
+            "title": "Sandbox"
+        }, 
+        {
+            "location": "/application_development/", 
+            "text": "Application Developer Guide\n\n\nThe Apex platform is designed to process massive amounts of\nreal-time events natively in Hadoop.  It runs as a YARN (Hadoop 2.x) \napplication and leverages Hadoop as a distributed operating\nsystem.  All the basic distributed operating system capabilities of\nHadoop like resource management (YARN), distributed file system (HDFS),\nmulti-tenancy, security, fault-tolerance, and scalability are supported natively \nin all the Apex applications. \u00a0The platform handles all the details of the application \nexecution, including dynamic scaling, state checkpointing and recovery, event \nprocessing guarantees, etc. allowing you to focus on writing your application logic without\nmixing operational and functional concerns.\n\n\nIn the platform, building a streaming application can be extremely\neasy and intuitive. \u00a0The application is represented as a Directed\nAcyclic Graph (DAG) of computation units called \nOperators\n interco
 nnected\nby the data-flow edges called  \nStreams\n.\u00a0The operators process input\nstreams and produce output streams. A library of common operators is\nprovided to enable quick application development. \u00a0In case the desired\nprocessing is not available in the Operator Library, one can easily\nwrite a custom operator. We refer those interested in creating their own\noperators to the \nOperator Development Guide\n.\n\n\nRunning A Test Application\n\n\nIf you are starting with the Apex platform for the first time,\nit can be informative to launch an existing application and see it run.\nOne of the simplest examples provided in \nApex-Malhar repository\n is a Pi demo application,\nwhich computes the value of PI using random numbers.  After \nsetting up development environment\n\nPi demo can be launched as follows:\n\n\n\n\nOpen up Apex Malhar files in your IDE (for example Eclipse, IntelliJ, NetBeans, etc)\n\n\nNavigate to \ndemos/pi/src/test/java/com/datatorrent/demos/Applicat
 ionTest.java\n\n\nRun the test for ApplicationTest.java\n\n\nView the output in system console\n\n\n\n\nCongratulations, you just ran your first real-time streaming demo :) \nThis demo is very simple and has four operators. The first operator\nemits random integers between 0 to 30, 000. The second operator receives\nthese coefficients and emits a hashmap with x and y values each time it\nreceives two values. The third operator takes these values and computes\nx**2+y**2. The last operator counts how many computed values from\nthe previous operator were less than or equal to 30, 000**2. Assuming\nthis count is N, then PI is computed as N/number of values received.\nHere is the code snippet for the PI application. This code populates the\nDAG. Do not worry about what each line does, we will cover these\nconcepts later in this document.\n\n\n// Generates random numbers\nRandomEventGenerator rand = dag.addOperator(\nrand\n, new RandomEventGenerator());\nrand.setMinvalue(0);\nrand.setMaxv
 alue(30000);\n\n// Generates a round robin HashMap of \nx\n and \ny\n\nRoundRobinHashMap\nString,Object\n rrhm = dag.addOperator(\nrrhm\n, new RoundRobinHashMap\nString, Object\n());\nrrhm.setKeys(new String[] { \nx\n, \ny\n });\n\n// Calculates pi from x and y\nJavaScriptOperator calc = dag.addOperator(\npicalc\n, new Script());\ncalc.setPassThru(false);\ncalc.put(\ni\n,0);\ncalc.put(\ncount\n,0);\ncalc.addSetupScript(\nfunction pi() { if (x*x+y*y \n= \n+maxValue*maxValue+\n) { i++; } count++; return i / count * 4; }\n);\ncalc.setInvoke(\npi\n);\ndag.addStream(\nrand_rrhm\n, rand.integer_data, rrhm.data);\ndag.addStream(\nrrhm_calc\n, rrhm.map, calc.inBindings);\n\n// puts results on system console\nConsoleOutputOperator console = dag.addOperator(\nconsole\n, new ConsoleOutputOperator());\ndag.addStream(\nrand_console\n,calc.result, console.input);\n\n\n\n\nYou can review the other demos and see what they do. The examples\ngiven in the Demos project cover various features of the pl
 atform and we\nstrongly encourage you to read these to familiarize yourself with the\nplatform. In the remaining part of this document we will go through\ndetails needed for you to develop and run streaming applications in\nMalhar.\n\n\nTest Application: Yahoo! Finance Quotes\n\n\nThe PI\u00a0application was to\nget you started. It is a basic application and does not fully illustrate\nthe features of the platform. For the purpose of describing concepts, we\nwill consider the test application shown in Figure 1. The application\ndownloads tick data from  \nYahoo! Finance\n \u00a0and computes the\nfollowing for four tickers, namely \nIBM\n,\n\nGOOG\n, \nYHOO\n.\n\n\n\n\nQuote: Consisting of last trade price, last trade time, and\n    total volume for the day\n\n\nPer-minute chart data: Highest trade price, lowest trade\n    price, and volume during that minute\n\n\nSimple Moving Average: trade price over 5 minutes\n\n\n\n\nTotal volume must ensure that all trade volume for that day is\
 nadded, i.e. data loss would result in wrong results. Charting data needs\nall the trades in the same minute to go to the same slot, and then on it\nstarts afresh, so again data loss would result in wrong results. The\naggregation for charting data is done over 1 minute. Simple moving\naverage computes the average price over a 5 minute sliding window; it\ntoo would produce wrong results if there is data loss. Figure 1 shows\nthe application with no partitioning.\n\n\n\n\nThe operator StockTickerInput:\u00a0StockTickerInput\n\u00a0\nis\nthe input operator that reads live data from Yahoo! Finance once per\ninterval (user configurable in milliseconds), and emits the price, the\nincremental volume, and the last trade time of each stock symbol, thus\nemulating real ticks from the exchange. \u00a0We utilize the Yahoo! Finance\nCSV web service interface. \u00a0For example:\n\n\n$ GET 'http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO\nf=sl1vt1'\n\nIBM\n,203.966,1513041,\n
 1:43pm\n\n\nGOOG\n,762.68,1879741,\n1:43pm\n\n\nAAPL\n,444.3385,11738366,\n1:43pm\n\n\nYHOO\n,19.3681,14707163,\n1:43pm\n\n\n\n\n\nAmong all the operators in Figure 1, StockTickerInput is the only\noperator that requires extra code because it contains a custom mechanism\nto get the input data. \u00a0Other operators are used unchanged from the\nMalhar library.\n\n\nHere is the class implementation for StockTickInput:\n\n\npackage com.datatorrent.demos.yahoofinance;\n\nimport au.com.bytecode.opencsv.CSVReader;\nimport com.datatorrent.annotation.OutputPortFieldAnnotation;\nimport com.datatorrent.api.Context.OperatorContext;\nimport com.datatorrent.api.DefaultOutputPort;\nimport com.datatorrent.api.InputOperator;\nimport com.datatorrent.lib.util.KeyValPair;\nimport java.io.IOException;\nimport java.io.InputStream;\nimport java.io.InputStreamReader;\nimport java.util.*;\nimport org.apache.commons.httpclient.HttpClient;\nimport org.apache.commons.httpclient.HttpStatus;\nimport org.apache.
 commons.httpclient.cookie.CookiePolicy;\nimport org.apache.commons.httpclient.methods.GetMethod;\nimport org.apache.commons.httpclient.params.DefaultHttpParams;\nimport org.slf4j.Logger;\nimport org.slf4j.LoggerFactory;\n\n/**\n * This operator sends price, volume and time into separate ports and calculates incremental volume.\n */\npublic class StockTickInput implements InputOperator\n{\n  private static final Logger logger = LoggerFactory.getLogger(StockTickInput.class);\n  /**\n   * Timeout interval for reading from server. 0 or negative indicates no timeout.\n   */\n  public int readIntervalMillis = 500;\n  /**\n   * The URL of the web service resource for the POST request.\n   */\n  private String url;\n  public String[] symbols;\n  private transient HttpClient client;\n  private transient GetMethod method;\n  private HashMap\nString, Long\n lastVolume = new HashMap\nString, Long\n();\n  private boolean outputEvenIfZeroVolume = false;\n  /**\n   * The output port to emit price.
 \n   */\n  @OutputPortFieldAnnotation(optional = true)\n  public final transient DefaultOutputPort\nKeyValPair\nString, Double\n price = new DefaultOutputPort\nKeyValPair\nString, Double\n();\n  /**\n   * The output port to emit incremental volume.\n   */\n  @OutputPortFieldAnnotation(optional = true)\n  public final transient DefaultOutputPort\nKeyValPair\nString, Long\n volume = new DefaultOutputPort\nKeyValPair\nString, Long\n();\n  /**\n   * The output port to emit last traded time.\n   */\n  @OutputPortFieldAnnotation(optional = true)\n  public final transient DefaultOutputPort\nKeyValPair\nString, String\n time = new DefaultOutputPort\nKeyValPair\nString, String\n();\n\n  /**\n   * Prepare URL from symbols and parameters. URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO\nf=sl1vt1\n   *\n   * @return the URL\n   */\n  private String prepareURL()\n  {\n    String str = \nhttp://download.finance.yahoo.com/d/quotes.csv?s=\n;\n    for 
 (int i = 0; i \n symbols.length; i++) {\n      if (i != 0) {\n        str += \n,\n;\n      }\n      str += symbols[i];\n    }\n    str += \nf=sl1vt1\ne=.csv\n;\n    return str;\n  }\n\n  @Override\n  public void setup(OperatorContext context)\n  {\n    url = prepareURL();\n    client = new HttpClient();\n    method = new GetMethod(url);\n    DefaultHttpParams.getDefaultParams().setParameter(\nhttp.protocol.cookie-policy\n, CookiePolicy.BROWSER_COMPATIBILITY);\n  }\n\n  @Override\n  public void teardown()\n  {\n  }\n\n  @Override\n  public void emitTuples()\n  {\n\n    try {\n      int statusCode = client.executeMethod(method);\n      if (statusCode != HttpStatus.SC_OK) {\n        System.err.println(\nMethod failed: \n + method.getStatusLine());\n      }\n      else {\n        InputStream istream = method.getResponseBodyAsStream();\n        // Process response\n        InputStreamReader isr = new InputStreamReader(istream);\n        CSVReader reader = new CSVReader(isr);\n        Lis
 t\nString[]\n myEntries = reader.readAll();\n        for (String[] stringArr: myEntries) {\n          ArrayList\nString\n tuple = new ArrayList\nString\n(Arrays.asList(stringArr));\n          if (tuple.size() != 4) {\n            return;\n          }\n          // input csv is \nSymbol\n,\nPrice\n,\nVolume\n,\nTime\n\n          String symbol = tuple.get(0);\n          double currentPrice = Double.valueOf(tuple.get(1));\n          long currentVolume = Long.valueOf(tuple.get(2));\n          String timeStamp = tuple.get(3);\n          long vol = currentVolume;\n          // Sends total volume in first tick, and incremental volume afterwards.\n          if (lastVolume.containsKey(symbol)) {\n            vol -= lastVolume.get(symbol);\n          }\n\n          if (vol \n 0 || outputEvenIfZeroVolume) {\n            price.emit(new KeyValPair\nString, Double\n(symbol, currentPrice));\n            volume.emit(new KeyValPair\nString, Long\n(symbol, vol));\n            time.emit(new KeyValPair
 \nString, String\n(symbol, timeStamp));\n            lastVolume.put(symbol, currentVolume);\n          }\n        }\n      }\n      Thread.sleep(readIntervalMillis);\n    }\n    catch (InterruptedException ex) {\n      logger.debug(ex.toString());\n    }\n    catch (IOException ex) {\n      logger.debug(ex.toString());\n    }\n  }\n\n  @Override\n  public void beginWindow(long windowId)\n  {\n  }\n\n  @Override\n  public void endWindow()\n  {\n  }\n\n  public void setOutputEvenIfZeroVolume(boolean outputEvenIfZeroVolume)\n  {\n       this.outputEvenIfZeroVolume = outputEvenIfZeroVolume;\n  }\n\n}\n\n\n\n\nThe operator has three output ports that emit the price of the\nstock, the volume of the stock and the last trade time of the stock,\ndeclared as public member variables price, volume\u00a0and  time\u00a0of the class. \u00a0The tuple of the\nprice\u00a0output port is a key-value\npair with the stock symbol being the key, and the price being the value.\n\u00a0The tuple of the volume
 \u00a0output\nport is a key value pair with the stock symbol being the key, and the\nincremental volume being the value. \u00a0The tuple of the  time\u00a0output port is a key value pair with the\nstock symbol being the key, and the last trade time being the\nvalue.\n\n\nImportant: Since operators will be\nserialized, all input and output ports need to be declared transient\nbecause they are stateless and should not be serialized.\n\n\nThe method\u00a0setup(OperatorContext)\ncontains the code that is necessary for setting up the HTTP\nclient for querying Yahoo! Finance.\n\n\nMethod\u00a0emitTuples() contains\nthe code that reads from Yahoo! Finance, and emits the data to the\noutput ports of the operator. \u00a0emitTuples()\u00a0will be called one or more times\nwithin one application window as long as time is allowed within the\nwindow.\n\n\nNote that we want to emulate the tick input stream by having\nincremental volume data with Yahoo! Finance data. \u00a0We therefore subtract\nt
 he previous volume from the current volume to emulate incremental\nvolume for each tick.\n\n\nThe operator\nDailyVolume:\u00a0This operator\nreads from the input port, which contains the incremental volume tuples\nfrom StockTickInput, and\naggregates the data to provide the cumulative volume. \u00a0It uses the\nlibrary class  SumKeyVal\nK,V\n\u00a0provided in math\u00a0package. \u00a0In this case,\nSumKeyVal\nString,Long\n, where K is the stock symbol, V is the\naggregated volume, with cumulative\nset to true. (Otherwise if  cumulativewas set to false, SumKeyVal would\nprovide the sum for the application window.) \u00a0Malhar provides a number\nof built-in operators for simple operations like this so that\napplication developers do not have to write them. \u00a0More examples to\nfollow. This operator assumes that the application restarts before the\nmarket opens every day.\n\n\nThe operator Quote:\nThis operator has three input ports, which are price (from\nStockTickInput), daily_vo
 l (from\nDaily Volume), and time (from\n StockTickInput). \u00a0This operator\njust consolidates the three data items and and emits the consolidated\ndata. \u00a0It utilizes the class ConsolidatorKeyVal\nK\n\u00a0from the\nstream\u00a0package.\n\n\nThe operator HighLow:\u00a0This operator reads from the input port,\nwhich contains the price tuples from StockTickInput, and provides the high and the\nlow price within the application window. \u00a0It utilizes the library class\n RangeKeyVal\nK,V\n\u00a0provided\nin the math\u00a0package. In this case,\nRangeKeyVal\nString,Double\n.\n\n\nThe operator MinuteVolume:\nThis operator reads from the input port, which contains the\nvolume tuples from StockTickInput,\nand aggregates the data to provide the sum of the volume within one\nminute. \u00a0Like the operator  DailyVolume, this operator also uses\nSumKeyVal\nString,Long\n, but\nwith cumulative set to false. \u00a0The\nApplication Window is set to one minute. We will explain how to set t
 his\nlater.\n\n\nThe operator Chart:\nThis operator is very similar to the operator Quote, except that it takes inputs from\nHigh Low\u00a0and  Minute Vol\u00a0and outputs the consolidated tuples\nto the output port.\n\n\nThe operator PriceSMA:\nSMA stands for - Simple Moving Average. It reads from the\ninput port, which contains the price tuples from StockTickInput, and\nprovides the moving average price of the stock. \u00a0It utilizes\nSimpleMovingAverage\nString,Double\n, which is provided in the\n multiwindow\u00a0package.\nSimpleMovingAverage keeps track of the data of the previous N\napplication windows in a sliding manner. \u00a0For each end window event, it\nprovides the average of the data in those application windows.\n\n\nThe operator Console:\nThis operator just outputs the input tuples to the console\n(or stdout). \u00a0In this example, there are four console\u00a0operators, which connect to the output\nof  Quote, Chart, PriceSMA and VolumeSMA. \u00a0In\npractice, they 
 should be replaced by operators that use the data to\nproduce visualization artifacts like charts.\n\n\nConnecting the operators together and constructing the\nDAG:\u00a0Now that we know the\noperators used, we will create the DAG, set the streaming window size,\ninstantiate the operators, and connect the operators together by adding\nstreams that connect the output ports with the input ports among those\noperators. \u00a0This code is in the file  YahooFinanceApplication.java. Refer to Figure 1\nagain for the graphical representation of the DAG. \u00a0The last method in\nthe code, namely getApplication(),\ndoes all that. \u00a0The rest of the methods are just for setting up the\noperators.\n\n\npackage com.datatorrent.demos.yahoofinance;\n\nimport com.datatorrent.api.ApplicationFactory;\nimport com.datatorrent.api.Context.OperatorContext;\nimport com.datatorrent.api.DAG;\nimport com.datatorrent.api.Operator.InputPort;\nimport com.datatorrent.lib.io.ConsoleOutputOperator;\nimport com
 .datatorrent.lib.math.RangeKeyVal;\nimport com.datatorrent.lib.math.SumKeyVal;\nimport com.datatorrent.lib.multiwindow.SimpleMovingAverage;\nimport com.datatorrent.lib.stream.ConsolidatorKeyVal;\nimport com.datatorrent.lib.util.HighLow;\nimport org.apache.hadoop.conf.Configuration;\n\n/**\n * Yahoo! Finance application demo. \np\n\n *\n * Get Yahoo finance feed and calculate minute price range, minute volume, simple moving average of 5 minutes.\n */\npublic class Application implements StreamingApplication\n{\n  private int streamingWindowSizeMilliSeconds = 1000; // 1 second (default is 500ms)\n  private int appWindowCountMinute = 60;   // 1 minute\n  private int appWindowCountSMA = 5 * 60;  // 5 minute\n\n  /**\n   * Get actual Yahoo finance ticks of symbol, last price, total daily volume, and last traded price.\n   */\n  public StockTickInput getStockTickInputOperator(String name, DAG dag)\n  {\n    StockTickInput oper = dag.addOperator(name, StockTickInput.class);\n    oper.readI
 ntervalMillis = 200;\n    return oper;\n  }\n\n  /**\n   * This sends total daily volume by adding volumes from each ticks.\n   */\n  public SumKeyVal\nString, Long\n getDailyVolumeOperator(String name, DAG dag)\n  {\n    SumKeyVal\nString, Long\n oper = dag.addOperator(name, new SumKeyVal\nString, Long\n());\n    oper.setType(Long.class);\n    oper.setCumulative(true);\n    return oper;\n  }\n\n  /**\n   * Get aggregated volume of 1 minute and send at the end window of 1 minute.\n   */\n  public SumKeyVal\nString, Long\n getMinuteVolumeOperator(String name, DAG dag, int appWindowCount)\n  {\n    SumKeyVal\nString, Long\n oper = dag.addOperator(name, new SumKeyVal\nString, Long\n());\n    oper.setType(Long.class);\n    oper.setEmitOnlyWhenChanged(true);\ndag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);\n    return oper;\n  }\n\n  /**\n   * Get High-low range for 1 minute.\n   */\n  public RangeKeyVal\nString, Double\n getHighLow
 Operator(String name, DAG dag, int appWindowCount)\n  {\n    RangeKeyVal\nString, Double\n oper = dag.addOperator(name, new RangeKeyVal\nString, Double\n());\n    dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);\n    oper.setType(Double.class);\n    return oper;\n  }\n\n  /**\n   * Quote (Merge price, daily volume, time)\n   */\n  public ConsolidatorKeyVal\nString,Double,Long,String,?,?\n getQuoteOperator(String name, DAG dag)\n  {\n    ConsolidatorKeyVal\nString,Double,Long,String,?,?\n oper = dag.addOperator(name, new ConsolidatorKeyVal\nString,Double,Long,String,Object,Object\n());\n    return oper;\n  }\n\n  /**\n   * Chart (Merge minute volume and minute high-low)\n   */\n  public ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n getChartOperator(String name, DAG dag)\n  {\n    ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n oper = dag.addOperator(name, new ConsolidatorKeyVal\nString,HighLow,Long,Object,Object,Object\n());\n 
    return oper;\n  }\n\n  /**\n   * Get simple moving average of price.\n   */\n  public SimpleMovingAverage\nString, Double\n getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount)\n  {\n    SimpleMovingAverage\nString, Double\n oper = dag.addOperator(name, new SimpleMovingAverage\nString, Double\n());\n    oper.setWindowSize(appWindowCount);\n    oper.setType(Double.class);\n    return oper;\n  }\n\n  /**\n   * Get console for output.\n   */\n  public InputPort\nObject\n getConsole(String name, /*String nodeName,*/ DAG dag, String prefix)\n  {\n    ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class);\n    oper.setStringFormat(prefix + \n: %s\n);\n    return oper.input;\n  }\n\n  /**\n   * Create Yahoo Finance Application DAG.\n   */\n  @Override\n  public void populateDAG(DAG dag, Configuration conf)\n  {\n    dag.getAttributes().put(DAG.STRAM_WINDOW_SIZE_MILLIS,streamingWindowSizeMilliSeconds);\n\n    StockTickInput tick = getSto
 ckTickInputOperator(\nStockTickInput\n, dag);\n    SumKeyVal\nString, Long\n dailyVolume = getDailyVolumeOperator(\nDailyVolume\n, dag);\n    ConsolidatorKeyVal\nString,Double,Long,String,?,?\n quoteOperator = getQuoteOperator(\nQuote\n, dag);\n\n    RangeKeyVal\nString, Double\n highlow = getHighLowOperator(\nHighLow\n, dag, appWindowCountMinute);\n    SumKeyVal\nString, Long\n minuteVolume = getMinuteVolumeOperator(\nMinuteVolume\n, dag, appWindowCountMinute);\n    ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n chartOperator = getChartOperator(\nChart\n, dag);\n\n    SimpleMovingAverage\nString, Double\n priceSMA = getPriceSimpleMovingAverageOperator(\nPriceSMA\n, dag, appWindowCountSMA);\n       DefaultPartitionCodec\nString, Double\n codec = new DefaultPartitionCodec\nString, Double\n();\n    dag.setInputPortAttribute(highlow.data, PortContext.STREAM_CODEC, codec);\n    dag.setInputPortAttribute(priceSMA.data, PortContext.STREAM_CODEC, codec);\n    dag.addStream(\nprice\n, tick
 .price, quoteOperator.in1, highlow.data, priceSMA.data);\n    dag.addStream(\nvol\n, tick.volume, dailyVolume.data, minuteVolume.data);\n    dag.addStream(\ntime\n, tick.time, quoteOperator.in3);\n    dag.addStream(\ndaily_vol\n, dailyVolume.sum, quoteOperator.in2);\n\n    dag.addStream(\nquote_data\n, quoteOperator.out, getConsole(\nquoteConsole\n, dag, \nQUOTE\n));\n\n    dag.addStream(\nhigh_low\n, highlow.range, chartOperator.in1);\n    dag.addStream(\nvol_1min\n, minuteVolume.sum, chartOperator.in2);\n    dag.addStream(\nchart_data\n, chartOperator.out, getConsole(\nchartConsole\n, dag, \nCHART\n));\n\n    dag.addStream(\nsma_price\n, priceSMA.doubleSMA, getConsole(\npriceSMAConsole\n, dag, \nPrice SMA\n));\n\n    return dag;\n  }\n\n}\n\n\n\n\nNote that we also set a user-specific sliding window for SMA that\nkeeps track of the previous N data points. \u00a0Do not confuse this with the\nattribute APPLICATION_WINDOW_COUNT.\n\n\nIn the rest of this chapter we will run through th
 e process of\nrunning this application. We assume that \u00a0you are familiar with details\nof your Hadoop infrastructure. For installation\ndetails please refer to the \nInstallation Guide\n.\n\n\nRunning a Test Application\n\n\nWe will now describe how to run the yahoo\nfinance application\u00a0described above in different modes\n(local mode, single node on Hadoop, and multi-nodes on Hadoop).\n\n\nThe platform runs streaming applications under the control of a\nlight-weight Streaming Application Manager (STRAM). Each application has\nits own instance of STRAM. STRAM launches the application and\ncontinually provides run time monitoring, analysis, and takes action\nsuch as load scaling or outage recovery as needed. \u00a0We will discuss\nSTRAM in more detail in the next chapter.\n\n\nThe instructions below assume that the platform was installed in a\ndirectory \nINSTALL_DIR\n and the command line interface (CLI) will\nbe used to launch the demo application. An application can be ru
 n in\nlocal mode\u00a0(in IDE or from command line) or on a Hadoop cluster.\n\n\nTo start the dtCli run\n\n\nINSTALL_DIR\n/bin/dtcli\n\n\n\nThe command line prompt appears.  To start the application in local mode (the actual version number in the file name may differ)\n\n\ndt\n launch -local \nINSTALL_DIR\n/yahoo-finance-demo-3.2.0-SNAPSHOT.apa\n\n\n\nTo terminate the application in local mode, enter Ctrl-C\n\n\nTu run the application on the Hadoop cluster (the actual version\nnumber in the file name may differ)\n\n\ndt\n launch \nINSTALL_DIR\n/yahoo-finance-demo-3.2.0-SNAPSHOT.apa\n\n\n\nTo stop the application running in Hadoop, terminate it in the dtCli:\n\n\ndt\n kill-app\n\n\n\nExecuting the application in either mode includes the following\nsteps. At a top level, STRAM (Streaming Application Manager) validates\nthe application (DAG), translates the logical plan to the physical plan\nand then launches the execution engine. The mode determines the\nresources needed and how how t
 hey are used.\n\n\nLocal Mode\n\n\nIn local mode, the application is run as a single-process\u00a0with multiple threads. Although a\nfew Hadoop classes are needed, there is no dependency on a Hadoop\ncluster or Hadoop services. The local file system is used in place of\nHDFS. This mode allows a quick run of an application in a single process\nsandbox, and hence is the most suitable to debug and analyze the\napplication logic. This mode is recommended for developing the\napplication and can be used for running applications within the IDE for\nfunctional testing purposes. Due to limited resources and lack \u00a0of\nscalability an application running in this single process mode is more\nlikely to encounter throughput bottlenecks. A distributed cluster is\nrecommended for benchmarking and production testing.\n\n\nHadoop Cluster\n\n\nIn this section we discuss various Hadoop cluster setups.\n\n\nSingle Node Cluster\n\n\nIn a single node Hadoop cluster all services are deployed on a\nsing
 le server (a developer can use his/her development machine as a\nsingle node cluster). The platform does not distinguish between a single\nor multi-node setup and behaves exactly the same in both cases.\n\n\nIn this mode, the resource manager, name node, data node, and node\nmanager occupy one process each. This is an example of running a\nstreaming application as a multi-process\u00a0application on the same server.\nWith prevalence of fast, multi-core systems, this mode is effective for\ndebugging, fine tuning, and generic analysis before submitting the job\nto a larger Hadoop cluster. In this mode, execution uses the Hadoop\nservices and hence is likely to identify issues that are related to the\nHadoop environment (such issues will not be uncovered in local mode).\nThe throughput will obviously not be as high as on a multi-node Hadoop\ncluster. Additionally, since each container (i.e. Java process) requires\na significant amount of memory, you will be able to run a much smaller\n
 number of containers than on a multi-node cluster.\n\n\nMulti-Node Cluster\n\n\nIn a multi-node Hadoop cluster all the services of Hadoop are\ntypically distributed across multiple nodes in a production or\nproduction-level test environment. Upon launch the application is\nsubmitted to the Hadoop cluster and executes as a  multi-processapplication on\u00a0multiple nodes.\n\n\nBefore you start deploying, testing and troubleshooting your\napplication on a cluster, you should ensure that Hadoop (version 2.2.0\nor later)\u00a0is properly installed and\nyou have basic skills for working with it.\n\n\n\n\nApache Apex Platform Overview\n\n\nStreaming Computational Model\n\n\nIn this chapter, we describe the the basics of the real-time streaming platform and its computational model.\n\n\nThe platform is designed to enable completely asynchronous real time computations\u00a0done in as unblocked a way as possible with\nminimal overhead .\n\n\nApplications running in the platform are represent
 ed by a Directed\nAcyclic Graph (DAG) made up of \u00a0operators and streams. All computations\nare done in memory on arrival of\nthe input data, with an option to save the output to disk (HDFS) in a\nnon-blocking way. The data that flows between operators consists of\natomic data elements. Each data element along with its type definition\n(henceforth called  schema) is\ncalled a tuple.\u00a0An application is a\ndesign of the flow of these tuples to and from\nthe appropriate compute units to enable the computation of the final\ndesired results.\u00a0A message queue (henceforth called\n\u00a0buffer server) manages tuples streaming\nbetween compute units in different processes.This server keeps track of\nall consumers, publishers, partitions, and enables replay. More\ninformation is given in later section.\n\n\nThe streaming application is monitored by a decision making entity\ncalled STRAM (streaming application\nmanager).\u00a0STRAM is designed to be a light weight\ncontroller that 
 has minimal but sufficient interaction with the\napplication. This is done via periodic heartbeats. The\nSTRAM does the initial launch and periodically analyzes the system\nmetrics to decide if any run time action needs to be taken.\n\n\nA fundamental building block for the streaming platform\nis the concept of breaking up a stream into equal finite time slices\ncalled streaming windows. Each window contains the ordered\nset of tuples in that time slice. A typical duration of a window is 500\nms, but can be configured per application (the Yahoo! Finance\napplication configures this value in the  properties.xml\u00a0file to be 1000ms = 1s). Each\nwindow is preceded by a begin_window\u00a0event and is terminated by an\nend_window\u00a0event, and is assigned\na unique window ID. Even though the platform performs computations at\nthe tuple level, bookkeeping is done at the window boundary, making the\ncomputations within a window an atomic event in the platform. \u00a0We can\nthink of e
 ach window as an  atomic\nmicro-batch\u00a0of tuples, to be processed together as one\natomic operation (See Figure 2). \u00a0\n\n\nThis atomic batching allows the platform to avoid the very steep\nper tuple bookkeeping cost and instead has a manageable per batch\nbookkeeping cost. This translates to higher throughput, low recovery\ntime, and higher scalability. Later in this document we illustrate how\nthe atomic micro-batch concept allows more efficient optimization\nalgorithms.\n\n\nThe platform also has in-built support for\napplication windows.\u00a0 An application window is part of the\napplication specification, and can be a small or large multiple of the\nstreaming window. \u00a0An example from our Yahoo! Finance test application\nis the moving average, calculated over a sliding application window of 5\nminutes which equates to 300 (= 5 * 60) streaming windows.\n\n\nNote that these two window concepts are distinct. \u00a0A streaming\nwindow is an abstraction of many tuples i
 nto a higher atomic event for\neasier management. \u00a0An application window is a group of consecutive\nstreaming windows used for data aggregation (e.g. sum, average, maximum,\nminimum) on a per operator level.\n\n\n\n\nAlongside the platform,\u00a0a set of\npredefined, benchmarked standard library operator templates is provided\nfor ease of use and rapid development of application.\u00a0These\noperators are open sourced to Apache Software Foundation under the\nproject name \u201cMalhar\u201d as part of our efforts to foster community\ninnovation. These operators can be used in a DAG as is, while others\nhave properties\u00a0that can be set to specify the\ndesired computation. Those interested in details, should refer to\n\nApex-Malhar operator library\n.\n\n\nThe platform is a Hadoop YARN native\napplication. It runs in a Hadoop cluster just like any\nother YARN application (MapReduce etc.) and is designed to seamlessly\nintegrate with rest of Hadoop technology stack. It leverage
 s Hadoop as\nmuch as possible and relies on it as its distributed operating system.\nHadoop dependencies include resource management, compute/memory/network\nallocation, HDFS, security, fault tolerance, monitoring, metrics,\nmulti-tenancy, logging etc. Hadoop classes/concepts are reused as much\nas possible.  The aim is to enable enterprises\nto leverage their existing Hadoop infrastructure for real time streaming\napplications. The platform is designed to scale with big\ndata applications and scale with Hadoop.\n\n\nA streaming application is an asynchronous execution of\ncomputations across distributed nodes. All computations are done in\nparallel on a distributed cluster. The computation model is designed to\ndo as many parallel computations as possible in a non-blocking fashion.\nThe task of monitoring of the entire application is done on (streaming)\nwindow boundaries with a streaming window as an atomic entity. A window\ncompletion is a quantum of work done. There is no assump
 tion that an\noperator can be interrupted at precisely a particular tuple or window.\n\n\nAn operator itself also\ncannot assume or predict the exact time a tuple that it emitted would\nget consumed by downstream operators. The operator processes the tuples\nit gets and simply emits new tuples based on its business logic. The\nonly guarantee it has is that the upstream operators are processing\neither the current or some later window, and the downstream operator is\nprocessing either the current or some earlier window. The completion of\na window (i.e. propagation of the  end_window\u00a0event through an operator) in any\noperator guarantees that all upstream operators have finished processing\nthis window. Thus, the end_window\u00a0event is blocking on an operator\nwith multiple outputs, and is a synchronization point in the DAG. The\n begin_window\u00a0event does not have\nany such restriction, a single begin_window\u00a0event from any upstream operator\ntriggers the operator to s
 tart processing tuples.\n\n\nStreaming Application Manager (STRAM)\n\n\nStreaming Application Manager (STRAM) is the Hadoop YARN native\napplication master. STRAM is the first process that is activated upon\napplication launch and orchestrates the streaming application on the\nplatform. STRAM is a lightweight controller process. The\nresponsibilities of STRAM include\n\n\n\n\n\n\nRunning the Application\n\n\n\n\nRead the\u00a0logical plan\u00a0of the application (DAG) submitted by the client\n\n\nValidate the logical plan\n\n\nTranslate the logical plan into a physical plan, where certain operators may  be partitioned (i.e. replicated) to multiple operators for  handling load.\n\n\nRequest resources (Hadoop containers) from Resource Manager,\n    per physical plan\n\n\nBased on acquired resources and application attributes, create\n    an execution plan\u00a0by partitioning the DAG into fragments,\n    each assigned to different containers.\n\n\nExecutes the application by deploying
  each fragment to\n    its container. Containers then start stream processing and run\n    autonomously, processing one streaming window after another. Each\n    container is represented as an instance of the  StreamingContainer\u00a0class, which updates\n    STRAM via the heartbeat protocol and processes directions received\n    from STRAM.\n\n\n\n\n\n\n\n\nContinually monitoring the application via heartbeats from each StreamingContainer\n\n\n\n\nCollecting Application System Statistics and Logs\n\n\nLogging all application-wide decisions taken\n\n\nProviding system data on the state of the application via a  Web Service.\n\n\n\n\nSupporting Fault Tolerance\n\n\na.  Detecting a node outage\nb.  Requesting a replacement resource from the Resource Manager\n    and scheduling state restoration for the streaming operators\nc.  Saving state to Zookeeper\n\n\n\n\n\n\nSupporting Dynamic Partitioning:\u00a0Periodically evaluating the SLA and modifying the physical plan if required\n    (l
 ogical plan does not change).\n\n\n\n\nEnabling Security:\u00a0Distributing security tokens for distributed components of the execution engine\n    and securing web service requests.\n\n\nEnabling Dynamic modification of DAG: In the future, we intend to allow for user initiated\n    modification of the logical plan to allow for changes to the\n    processing logic and functionality.\n\n\n\n\nAn example of the Yahoo! Finance Quote application scheduled on a\ncluster of 5 Hadoop containers (processes) is shown in Figure 3.\n\n\n\n\nAn example for the translation from a logical plan to a physical\nplan and an execution plan for a subset of the application is shown in\nFigure 4.\n\n\n\n\nHadoop Components\n\n\nIn this section we cover some aspects of Hadoop that your\nstreaming application interacts with. This section is not meant to\neducate the reader on Hadoop, but just get the reader acquainted with\nthe terms. We strongly advise readers to learn Hadoop from other\nsources.\n\n\nA s
 treaming application runs as a native Hadoop 2.2 application.\nHadoop 2.2 does not differentiate between a map-reduce job and other\napplications, and hence as far as Hadoop is concerned, the streaming\napplication is just another job. This means that your application\nleverages all the bells and whistles Hadoop provides and is fully\nsupported within Hadoop technology stack. The platform is responsible\nfor properly integrating itself with the relevant components of Hadoop\nthat exist today and those that may emerge in the future\n\n\nAll investments that leverage multi-tenancy (for example quotas\nand queues), security (for example kerberos), data flow integration (for\nexample copying data in-out of HDFS), monitoring, metrics collections,\netc. will require no changes when streaming applications run on\nHadoop.\n\n\nYARN\n\n\nYARN\nis\nthe core library of Hadoop 2.2 that is tasked with resource management\nand works as a distributed application framework. In this section we\nwill
  walk through Yarn's components. In Hadoop 2.2, the old jobTracker\nhas been replaced by a combination of ResourceManager (RM) and\nApplicationMaster (AM).\n\n\nResource Manager (RM)\n\n\nResourceManager\n(RM)\nmanages all the distributed resources. It allocates and arbitrates all\nthe slots and the resources (cpu, memory, network) of these slots. It\nworks with per-node NodeManagers (NMs) and per-application\nApplicationMasters (AMs). Currently memory usage is monitored by RM; in\nupcoming releases it will have CPU as well as network management. RM is\nshared by map-reduce and streaming applications. Running streaming\napplications requires no changes in the RM.\n\n\nApplication Master (AM)\n\n\nThe AM is the watchdog or monitoring process for your application\nand has the responsibility of negotiating resources with RM and\ninteracting with NodeManagers to get the allocated containers started.\nThe AM is the starting point of your application and is considered user\ncode (not syst
 em Hadoop code). The AM itself runs in one container. All\nresource management within the application are managed by the AM. This\nis a critical feature for Hadoop 2.2 where tasks done by jobTracker in\nHadoop 1.0 have been distributed allowing Hadoop 2.2 to scale much\nbeyond Hadoop 1.0. STRAM is a native YARN ApplicationManager.\n\n\nNode Managers (NM)\n\n\nThere is one \nNodeManager\n(NM)\nper node in the cluster. All the containers (i.e. processes) on that\nnode are monitored by the NM. It takes instructions from RM and manages\nresources of that node as per RM instructions. NMs interactions are same\nfor map-reduce and for streaming applications. Running streaming\napplications requires no changes in the NM.\n\n\nRPC Protocol\n\n\nCommunication among RM, AM, and NM is done via the Hadoop RPC\nprotocol. Streaming applications use the same protocol to send their\ndata. No changes are needed in RPC support provided by Hadoop to enable\ncommunication done by components of your appl
 ication.\n\n\nHDFS\n\n\nHadoop includes a highly fault tolerant, high throughput\ndistributed file system (\nHDFS\n).\nIt runs on commodity hardware, and your streaming application will, by\ndefault, use it. There is no difference between files created by a\nstreaming application and those created by map-reduce.\n\n\nDeveloping An Application\n\n\nIn this chapter we describe the methodology to develop an\napplication using the Realtime Streaming Platform. The platform was\ndesigned to make it easy to build and launch sophisticated streaming\napplications with the developer having to deal only with the\napplication/business logic. The platform deals with details of where to\nrun what operators on which servers and how to correctly route streams\nof data among them.\n\n\nDevelopment Process\n\n\nWhile the platform does not mandate a specific methodology or set\nof development tools, we have recommendations to maximize productivity\nfor the different phases of application development.\
 n\n\nDesign\n\n\n\n\nIdentify common, reusable operators. Use a library\n    if possible.\n\n\nIdentify scalability and performance requirements before\n    designing the DAG.\n\n\nLeverage attributes that the platform supports for scalability\n    and performance.\n\n\nUse operators that are benchmarked and tested so that later\n    surprises are minimized. If you have glue code, create appropriate\n    unit tests for it.\n\n\nUse THREAD_LOCAL locality for high throughput streams. If all\n    the operators on that stream cannot fit in one container,\n    try\u00a0NODE_LOCAL\u00a0locality. Both THREAD_LOCAL and\n    NODE_LOCAL streams avoid the Network Interface Card (NIC)\n    completly. The former uses intra-process communication to also avoid\n    serialization-deserialization overhead.\n\n\nThe overall throughput and latencies are are not necessarily\n    correlated to the number of operators in a simple way -- the\n    relationship is more nuanced. A lot depends on how much wor
 k\n    individual operators are doing, how many are able to operate in\n    parallel, and how much data is flowing through the arcs of the DAG.\n    It is, at times, better to break a computation down into its\n    constituent simple parts and then stitch them together via streams\n    to better utilize the compute resources of the cluster. Decide on a\n    per application basis the fine line between complexity of each\n    operator vs too many streams. Doing multiple computations in one\n    operator does save network I/O, while operators that are too complex\n    are hard to maintain.\n\n\nDo not use operators that depend on the order of two streams\n    as far as possible. In such cases behavior is not idempotent.\n\n\nPersist key information to HDFS if possible; it may be useful\n    for debugging later.\n\n\nDecide on an appropriate fault tolerance mechanism. If some\n    data loss is acceptable, use the at-most-once mechanism as it has\n    fastest recovery.\n\n\n\n\nCreating 
 New Project\n\n\nPlease refer to the \nApex Application Packages\n\u00a0for\nthe basic steps for creating a new project.\n\n\nWriting the application code\n\n\nPreferably use an IDE (Eclipse, Netbeans etc.) that allows you to\nmanage dependencies and assists with the Java coding. Specific benefits\ninclude ease of managing operator library jar files, individual operator\nclasses, ports and properties. It will also highlight and assist to\nrectify issues such as type mismatches when adding streams while\ntyping.\n\n\nTesting\n\n\nWrite test cases with JUnit or similar test framework so that code\nis tested as it is written. For such testing, the DAG can run in local\nmode within the IDE. Doing this may involve writing mock input or output\noperators for the integration points with external systems. For example,\ninstead of reading from a live data stream, the application in test mode\ncan read from and write to files. This can be done with a single\napplication DAG by instrumenting a
  test mode using settings in the\nconfiguration that is passed to the application factory\ninterface.\n\n\nGood test coverage will not only eliminate basic validation errors\nsuch as missing port connections or property constraint violations, but\nalso validate the correct processing of the data. The same tests can be\nre-run whenever the application or its dependencies change (operator\nlibraries, version of the platform etc.)\n\n\nRunning an application\n\n\nThe platform provides a commandline tool called dtcli\u00a0for managing applications (launching,\nkilling, viewing, etc.). This tool was already discussed above briefly\nin the section entitled Running the Test Application. It will introspect\nthe jar file specified with the launch command for applications (classes\nthat implement ApplicationFactory) or property files that define\napplications. It will also deploy the dependency jar files from the\napplication package to the cluster.\n\n\nDtcli can run the application in local
  mode (i.e. outside a\ncluster). It is recommended to first run the application in local mode\nin the development environment before launching on the Hadoop cluster.\nThis way some of the external system integration and correct\nfunctionality of the application can be verified in an easier to debug\nenvironment before testing distributed mode.\n\n\nFor more details on CLI please refer to the \ndtCli Guide\n.\n\n\nApplication API\n\n\nThis section introduces the API to write a streaming application.\nThe work involves connecting operators via streams to form the logical\nDAG. The steps are\n\n\n\n\n\n\nInstantiate an application (DAG)\n\n\n\n\n\n\n(Optional) Set Attributes\n\n\n\n\nAssign application name\n\n\nSet any other attributes as per application requirements\n\n\n\n\n\n\n\n\nCreate/re-use and instantiate operators\n\n\n\n\nAssign operator name that is unique within the  application\n\n\nDeclare schema upfront for each operator (and thereby its ports)\n\n\n(Optional) Set prope
 rties\u00a0 and attributes on the dag as per specification\n\n\nConnect ports of operators via streams\n\n\nEach stream connects one output port of an operator to one or  more input ports of other operators.\n\n\n(Optional) Set attributes on the streams\n\n\n\n\n\n\n\n\n\n\n\n\nTest the application.\n\n\n\n\n\n\nThere are two methods to create an application, namely Java, and\nProperties file. Java API is for applications being developed by humans,\nand properties file (Hadoop like) is more suited for DAGs generated by\ntools.\n\n\nJava API\n\n\nThe Java API is the most common way to create a streaming\napplication. It is meant for application developers who prefer to\nleverage the features of Java, and the ease of use and enhanced\nproductivity provided by IDEs like NetBeans or Eclipse. Using Java to\nspecify the application provides extra validation abilities of Java\ncompiler, such as compile time checks for type safety at the time of\nwriting the code. Later in this chapter you 
 can read more about\nvalidation support in the platform.\n\n\nThe developer specifies the streaming application by implementing\nthe ApplicationFactory interface, which is how platform tools (CLI etc.)\nrecognize and instantiate applications. Here we show how to create a\nYahoo! Finance application that streams the last trade price of a ticker\nand computes the high and low price in every 1 min window. Run above\n test application\u00a0to execute the\nDAG in local mode within the IDE.\n\n\nLet us revisit how the Yahoo! Finance test application constructs the DAG:\n\n\npublic class Application implements StreamingApplication\n{\n\n  ...\n\n  @Override\n  public void populateDAG(DAG dag, Configuration conf)\n  {\n    dag.getAttributes().attr(DAG.STRAM_WINDOW_SIZE_MILLIS).set(streamingWindowSizeMilliSeconds);\n\n    StockTickInput tick = getStockTickInputOperator(\nStockTickInput\n, dag);\n    SumKeyVal\nString, Long\n dailyVolume = getDailyVolumeOperator(\nDailyVolume\n, dag);\n    Co
 nsolidatorKeyVal\nString,Double,Long,String,?,?\n quoteOperator = getQuoteOperator(\nQuote\n, dag);\n\n    RangeKeyVal\nString, Double\n highlow = getHighLowOperator(\nHighLow\n, dag, appWindowCountMinute);\n    SumKeyVal\nString, Long\n minuteVolume = getMinuteVolumeOperator(\nMinuteVolume\n, dag, appWindowCountMinute);\n    ConsolidatorKeyVal\nString,HighLow,Long,?,?,?\n chartOperator = getChartOperator(\nChart\n, dag);\n\n    SimpleMovingAverage\nString, Double\n priceSMA = getPriceSimpleMovingAverageOperator(\nPriceSMA\n, dag, appWindowCountSMA);\n\n    dag.addStream(\nprice\n, tick.price, quoteOperator.in1, highlow.data, priceSMA.data);\n    dag.addStream(\nvol\n, tick.volume, dailyVolume.data, minuteVolume.data);\n    dag.addStream(\ntime\n, tick.time, quoteOperator.in3);\n    dag.addStream(\ndaily_vol\n, dailyVolume.sum, quoteOperator.in2);\n\n    dag.addStream(\nquote_data\n, quoteOperator.out, getConsole(\nquoteConsole\n, dag, \nQUOTE\n));\n\n    dag.addStream(\nhigh_low\n,
  highlow.range, chartOperator.in1);\n    dag.addStream(\nvol_1min\n, minuteVolume.sum, chartOperator.in2);\n    dag.addStream(\nchart_data\n, chartOperator.out, getConsole(\nchartConsole\n, dag, \nCHART\n));\n\n    dag.addStream(\nsma_price\n, priceSMA.doubleSMA, getConsole(\npriceSMAConsole\n, dag, \nPrice SMA\n));\n\n    return dag;\n  }\n}\n\n\n\n\nProperty File API\n\n\nThe platform also supports specification of a DAG via a property\nfile. The aim here to make it easy for tools to create and run an\napplication. This method of specification does not have the Java\ncompiler support of compile time check, but since these applications\nwould be created by software, they should be correct by construction.\nThe syntax is derived from Hadoop properties and should be easy for\nfolks who are used to creating software that integrated with\nHadoop.\n\n\nCreate an application (DAG): myApplication.properties\n\n\n# input operator that reads from a file\ndt.operator.inputOp.classname=com.ac
 me.SampleInputOperator\ndt.operator.inputOp.fileName=somefile.txt\n\n# output operator that writes to the console\ndt.operator.outputOp.classname=com.acme.ConsoleOutputOperator\n\n# stream connecting both operators\ndt.stream.inputStream.source=inputOp.outputPort\ndt.stream.inputStream.sinks=outputOp.inputPort\n\n\n\n\nAbove snippet is intended to convey the basic idea of specifying\nthe DAG without using Java. Operators would come from a predefined\nlibrary and referenced in the specification by class name and port names\n(obtained from the library providers documentation or runtime\nintrospection by tools). For those interested in details, see later\nsections and refer to the  Operation and\nInstallation Guide\u00a0mentioned above.\n\n\nAttributes\n\n\nAttributes impact the runtime behavior of the application. They do\nnot impact the functionality. An example of an attribute is application\nname. Setting it changes the application name. Another example is\nstreaming window size. S
 etting it changes the streaming window size from\nthe default value to the specified value. Users cannot add new\nattributes, they can only choose from the ones that come packaged and\npre-supported by the platform. Details of attributes are covered in the\n Operation and Installation\nGuide.\n\n\nOperators\n\n\nOperators\u00a0are basic compute units.\nOperators process each incoming tuple and emit zero or more tuples on\noutput ports as per the business logic. The data flow, connectivity,\nfault tolerance (node outage), etc. is taken care of by the platform. As\nan operator developer, all that is needed is to figure out what to do\nwith the incoming tuple and when (and which output port) to send out a\nparticular output tuple. Correctly designed operators will most likely\nget reused. Operator design needs care and foresight. For details, refer\nto the  \nOperator Developer Guide\n. As an application developer you need to connect operators\nin a way that it implements your business
  logic. You may also require\noperator customization for functionality and use attributes for\nperformance/scalability etc.\n\n\nAll operators process tuples asynchronously in a distributed\ncluster. An operator cannot assume or predict the exact time a tuple\nthat it emitted will get consumed by a downstream operator. An operator\nalso cannot predict the exact time when a tuple arrives from an upstream\noperator. The only guarantee is that the upstream operators are\nprocessing the current or a future window, i.e. the windowId of upstream\noperator is equals or exceeds its own windowId. Conversely the windowId\nof a downstream operator is less than or equals its own windowId. The\nend of a window operation, i.e. the API call to endWindow on an operator\nrequires that all upstream operators have finished processing this\nwindow. This means that completion of processing a window propagates in\na blocking fashion through an operator. Later sections provides more\ndetails on streams an
 d data flow of tuples.\n\n\nEach operator has a unique name within the DAG as provided by the\nuser. This is the name of the operator in the logical plan. The name of\nthe operator in the physical plan is an integer assigned to it by STRAM.\nThese integers are use the sequence from 1 to N, where N is total number\nof physically unique operators in the DAG. \u00a0Following the same rule,\neach partitioned instance of a logical operator has its own integer as\nan id. This id along with the Hadoop container name uniquely identifies\nthe operator in the execution plan of the DAG. The logical names and the\nphysical names are required for web service support. Operators can be\naccessed via both names. These same names are used while interacting\nwith  dtcli\u00a0to access an operator.\nIdeally these names should be self-descriptive. For example in Figure 1,\nthe node named \u201cDaily volume\u201d has a physical identifier of 2.\n\n\nOperator Interface\n\n\nOperator interface in a DAG co
 nsists of ports,\u00a0properties,\u00a0and attributes.\nOperators interact with other components of the DAG via ports. Functional behavior of the operators\ncan be customized via parameters. Run time performance and physical\ninstantiation is controlled by attributes. Ports and parameters are\nfields (variables) of the Operator class/object, while attributes are\nmeta information that is attached to the operator object via an\nAttributeMap. An operator must have at least one port. Properties are\noptional. Attributes are provided by the platform and always have a\ndefault value that enables normal functioning of operators.\n\n\nPorts\n\n\nPorts are connection points by which an operator receives and\nemits tuples. These should be transient objects instantiated in the\noperator object, that implement particular interfaces. Ports should be\ntransient as they contain no state. They have a pre-defined schema and\ncan only be connected to other ports with the same schema. An input port\n
 needs to implement the interface  Operator.InputPort\u00a0and\ninterface Sink. A default\nimplementation of these is provided by the abstract class DefaultInputPort. An output port needs to\nimplement the interface  Operator.OutputPort. A default implementation\nof this is provided by the concrete class DefaultOutputPort. These two are a quick way to\nimplement the above interfaces, but operator developers have the option\nof providing their own implementations.\n\n\nHere are examples of an input and an output port from the operator\nSum.\n\n\n@InputPortFieldAnnotation(name = \ndata\n)\npublic final transient DefaultInputPort\nV\n data = new DefaultInputPort\nV\n() {\n  @Override\n  public void process(V tuple)\n  {\n    ...\n  }\n}\n@OutputPortFieldAnnotation(optional=true)\npublic final transient DefaultOutputPort\nV\n sum = new DefaultOutputPort\nV\n(){ \u2026 };\n\n\n\n\nThe process call is in the Sink interface. An emit on an output\nport is done via emit(tuple) call. For the a
 bove example it would be\nsum.emit(t), where the type of t is the generic parameter V.\n\n\nThere is no limit on how many ports an operator can have. However\nany operator must have at least one port. An operator with only one port\nis called an Input Adapter if it has no input port and an Output Adapter\nif it has no output port. These are special operators needed to get/read\ndata from outside system/source into the application, or push/write data\ninto an outside system/sink. These could be in Hadoop or outside of\nHadoop. These two operators are in essence gateways for the streaming\napplication to communicate with systems outside the application.\n\n\nPort connectivity can be validated during compile time by adding\nPortFieldAnnotations shown above. By default all ports have to be\nconnected, to allow a port to go unconnected, you need to add\n\u201coptional=true\u201d to the annotation.\n\n\nAttributes can be specified for ports that affect the runtime\nbehavior. An example of
  an attribute is parallel partition that specifes\na parallel computation flow per partition. It is described in detail in\nthe Parallel Partitions section. Another example is queue capacity that specifies the buffer size for the\nport. Details of attributes are covered in  Operation and Installation Guide.\n\n\nProperties\n\n\nProperties are the abstractions by which functional behavior of an\noperator can be customized. They should be non-transient objects\ninstantiated in the operator object. They need to be non-transient since\nthey are part of the operator state and re-construction of the operator\nobject from its checkpointed state must restore the operator to the\ndesired state. Properties are optional, i.e. an operator may or may not\nhave properties; they are part of user code and their values are not\ninterpreted by the platform in any way.\n\n\nAll non-serializable objects should be declared transient.\nExamples include sockets, session information, etc. These objects sho
 uld\nbe initialized during setup call, which is called every time the\noperator is initialized.\n\n\nAttributes\n\n\nAttributes are values assigned to the operators that impact\nrun-time. This includes things like the number of partitions, at most\nonce or at least once or exactly once recovery modes, etc. Attributes do\nnot impact functionality of the operator. Users can change certain\nattributes in runtime. Users cannot add attributes to operators; they\nare pre-defined by the platform. They are interpreted by the platform\nand thus cannot be defined in user created code (like properties).\nDetails of attributes are covered in  \nConfiguration Guide\n.\n\n\nOperator State\n\n\nThe state of an operator is defined as the data that it transfers\nfrom one window to a future window. Since the computing model of the\nplatform is to treat windows like micro-batches, the operator state can\nbe checkpointed every Nth window, or every T units of time, where T is significantly greater\nthan
  the streaming window. \u00a0When an operator is checkpointed, the entire\nobject is written to HDFS. \u00a0The larger the amount of state in an\noperator, the longer it takes to recover from a failure. A stateless\noperator can recover much quicker than a stateful one. The needed\nwindows are preserved by the upstream buffer server and are used to\nrecompute the lost windows, and also rebuild the buffer server in the\ncurrent container.\n\n\nThe distinction between Stateless and Stateful is based solely on\nthe need to transfer data in the operator from one window to the next.\nThe state of an operator is independent of the number of ports.\n\n\nStateless\n\n\nA Stateless operator is defined as one where no data is needed to\nbe kept at the end of every window. This means that all the computations\nof a window can be derived from all the tuples the operator receives\nwithin that window. This guarantees that the output of any window can be\nreconstructed by simply replaying the tupl
 es that arrived in that\nwindow. Stateless operators are more efficient in terms of fault\ntolerance, and cost to achieve SLA.\n\n\nStateful\n\n\nA Stateful operator is defined as one where data is needed to be\nstored at the end of a window for computations occurring in later\nwindow; a common example is the computation of a sum of values in the\ninput tuples.\n\n\nOperator API\n\n\nThe Operator API consists of methods that operator developers may\nneed to override. In this section we will discuss the Operator APIs from\nthe point of view of an application developer. Knowledge of how an\noperator works internally is critical for writing an application. Those\ninterested in the details should refer to  Malhar Operator Developer Guide.\n\n\nThe APIs are available in three modes, namely Single Streaming\nWindow, Sliding Application Window, and Aggregate Application Window.\nThese are not mutually exclusive, i.e. an operator can use single\nstreaming window as well as sliding applicati
 on window. A physical\ninstance of an operator is always processing tuples from a single\nwindow. The processing of tuples is guaranteed to be sequential, no\nmatter which input port the tuples arrive on.\n\n\nIn the later part of this section we will evaluate three common\nuses of streaming windows by applications. They have different\ncharacteristics and implications on optimization and recovery mechanisms\n(i.e. algorithm used to recover a node after outage) as discussed later\nin the section.\n\n\nStreaming Window\n\n\nStreaming window is atomic micro-batch computation period. The API\nmethods relating to a streaming window are as follows\n\n\npublic void process(\ntuple_type\n tuple) // Called on the input port on which the tuple arrives\npublic void beginWindow(long windowId) // Called at the start of the window as soon as the first begin_window tuple arrives\npublic void endWindow() // Called at the end of the window after end_window tuples arrive on all input ports\npublic v
 oid setup(OperatorContext context) // Called once during initialization of the operator\npublic void teardown() // Called once when the operator is being shutdown\n\n\n\n\nA tuple can be emitted in any of the three streaming run-time\ncalls, namely beginWindow, process, and endWindow but not in setup or\nteardown.\n\n\nAggregate Application Window\n\n\nAn operator with an aggregate window is stateful within the\napplication window timeframe and possibly stateless at the end of that\napplication window. An size of an aggregate application window is an\noperator attribute and is defined as a multiple of the streaming window\nsize. The platform recognizes this attribute and optimizes the operator.\nThe beginWindow, and endWindow calls are not invoked for those streaming\nwindows that do not align with the application window. For example in\ncase of streaming window of 0.5 second and application window of 5\nminute, an application window spans 600 streaming windows (5*60*2 =\n600). At t
 he start of the sequence of these 600 atomic streaming\nwindows, a beginWindow gets invoked, and at the end of these 600\nstreaming windows an endWindow gets invoked. All the intermediate\nstreaming windows do not invoke beginWindow or endWindow. Bookkeeping,\nnode recovery, stats, UI, etc. continue to work off streaming windows.\nFor example if operators are being checkpointed say on an average every\n30th window, then the above application window would have about 20\ncheckpoints.\n\n\nSliding Application Window\n\n\nA sliding window is computations that requires previous N\nstreaming windows. After each streaming window the Nth past window is\ndropped and the new window is added to the computation. An operator with\nsliding window is a stateful operator at end of any window. The sliding\nwindow period is an attribute and is a multiple of streaming window. The\nplatform recognizes this attribute and leverages it during bookkeeping.\nA sliding aggregate window with tolerance to data
  loss does not have a\nvery high bookkeeping cost. The cost of all three recovery mechanisms,\n at most once\u00a0(data loss tolerant),\nat least once\u00a0(data loss\nintolerant), and exactly once\u00a0(data\nloss intolerant and no extra computations) is same as recovery\nmechanisms based on streaming window. STRAM is not able to leverage this\noperator for any extra optimization.\n\n\nSingle vs Multi-Input Operator\n\n\nA single-input operator by definition has a single upstream\noperator, since there can only be one writing port for a stream. \u00a0If an\noperator has a single upstream operator, then the beginWindow on the\nupstream also blocks the beginWindow of the single-input operator. For\nan operator to start processing any window at least one upstream\noperator has to start processing that window. A multi-input operator\nreads from more than one upstream ports. Such an operator would start\nprocessing as soon as the first begin_window event arrives. However the\nwindow wou
 ld not close (i.e. invoke endWindow) till all ports receive\nend_window events for that windowId. Thus the end of a window is a\nblocking event. As we saw earlier, a multi-input operator is also the\npoint in the DAG where windows of all upstream operators are\nsynchronized. The windows (atomic micro-batches) from a faster (or just\nahead in processing) upstream operators are queued up till the slower\nupstream operator catches up. STRAM monitors such bottlenecks and takes\ncorrective actions. The platform ensures minimal delay, i.e processing\nstarts as long as at least one upstream operator has started\nprocessing.\n\n\nRecovery Mechanisms\n\n\nApplication developers can set any of the recovery mechanisms\nbelow to deal with node outage. In general, the cost of recovery depends\non the state of the operator, while data integrity is dependant on the\napplication. The mechanisms are per window as the platform treats\nwindows as atomic compute units. Three recovery mechanisms are\nsu
 pported, namely\n\n\n\n\nAt-least-once: All atomic batches are processed at least once.\n    No data loss occurs.\n\n\nAt-most-once: All atomic batches are processed at most once.\n    Data loss is possible; this is the most efficient setting.\n\n\nExactly-once: All atomic batches are processed exactly once.\n    No data loss occurs; this is the least efficient setting since\n    additional work is needed to ensure proper semantics.\n\n\n\n\nAt-least-once is the default. During a recovery event, the\noperator connects to the upstream buffer server and asks for windows to\nbe replayed. At-least-once and exactly-once mechanisms start from its\ncheckpointed state. At-most-once starts from the next begin-window\nevent.\n\n\nRecovery mechanisms can be specified per Operator while writing\nthe application as shown below.\n\n\nOperator o = dag.addOperator(\u201coperator\u201d, \u2026);\ndag.setAttribute(o,  OperatorContext.PROCESSING_MODE,  ProcessingMode.AT_MOST_ONCE);\n\n\n\n\nAlso note 
 that once an operator is attributed to AT_MOST_ONCE,\nall the operators downstream to it have to be AT_MOST_ONCE. The client\nwill give appropriate warnings or errors if that\u2019s not the case.\n\n\nDetails are explained in the chapter on Fault Tolerance below.\n\n\nStreams\n\n\nA stream\u00a0is a connector\n(edge) abstraction, and is a fundamental building block of the platform.\nA stream consists of tuples that flow from one port (called the\noutput\u00a0port) to one or more ports\non other operators (called  input\u00a0ports) another -- so note a potentially\nconfusing aspect of this terminology: tuples enter a stream through its\noutput port and leave via one or more input ports. A stream has the\nfollowing characteristics\n\n\n\n\nTuples are always delivered in the same order in which they\n    were emitted.\n\n\nConsists of a sequence of windows one after another. Each\n    window being a collection of in-order tuples.\n\n\nA stream that connects two containers passes throug
 h a\n    buffer server.\n\n\nAll streams can be persisted (by default in HDFS).\n\n\nExactly one output port writes to the stream.\n\n\nCan be read by one or more input ports.\n\n\nConnects operators within an application, not outside\n    an application.\n\n\nHas an unique name within an application.\n\n\nHas attributes which act as hints to STRAM.\n\n\n\n\nStreams have four modes, namely in-line, in-node, in-rack,\n    and other. Modes may be overruled (for example due to lack\n    of containers). They are defined as follows:\n\n\n\n\nTHREAD_LOCAL: In the same thread, uses thread\n    stack (intra-thread). This mode can only be used for a downstream\n    operator which has only one input port connected; also called\n    in-line.\n\n\nCONTAINER_LOCAL: In the same container (intra-process); also\n    called in-container.\n\n\nNODE_LOCAL: In the same Hadoop node (inter processes, skips\n    NIC); also called in-node.\n\n\nRACK_LOCAL: On nodes in the same rack; also called\n    in-rac
 k.\n\n\nunspecified: No guarantee. Could be anywhere within the\n    cluster\n\n\n\n\n\n\n\n\nAn example of a stream declaration is given below\n\n\nDAG dag = new DAG();\n \u2026\ndag.addStream(\nviews\n, viewAggregate.sum, cost.data).setLocality(CONTAINER_LOCAL); // A container local  stream\ndag.addStream(\u201cclicks\u201d, clickAggregate.sum, rev.data); // An example of unspecified locality\n\n\n\n\nThe platform guarantees in-order delivery of tuples in a stream.\nSTRAM views each stream as collection of ordered windows. Since no tuple\ncan exist outside a window, a replay of a stream consists of replay of a\nset of windows. When multiple input ports read the same stream, the\nexecution plan of a stream ensures that each input port is logically not\nblocked by the reading of another input port. The schema of a stream is\nsame as the schema of the tuple.\n\n\nIn a stream all tuples emitted by an operator in a window belong\nto that window. A replay of this window would consists o
 f an in-order\nreplay of all the tuples. Thus the tuple order within a stream is\nguaranteed. However since an operator may receive multiple streams (for\nexample an operator with

<TRUNCATED>