You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sa...@apache.org on 2016/03/10 02:54:03 UTC

[2/3] incubator-apex-site git commit: from 1ae98d4ad92c8c8ddb432b795d550efb7213dcbf

http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/75cddb1b/content/docs/apex-3.3/mkdocs/search_index.json
----------------------------------------------------------------------
diff --git a/content/docs/apex-3.3/mkdocs/search_index.json b/content/docs/apex-3.3/mkdocs/search_index.json
index 85c9b89..34dd42f 100644
--- a/content/docs/apex-3.3/mkdocs/search_index.json
+++ b/content/docs/apex-3.3/mkdocs/search_index.json
@@ -72,10 +72,20 @@
         }, 
         {
             "location": "/application_development/#hadoop-cluster", 
-            "text": "In this section we discuss various Hadoop cluster setups.  Single Node Cluster  In a single node Hadoop cluster all services are deployed on a\nsingle server (a developer can use his/her development machine as a\nsingle node cluster). The platform does not distinguish between a single\nor multi-node setup and behaves exactly the same in both cases.  In this mode, the resource manager, name node, data node, and node\nmanager occupy one process each. This is an example of running a\nstreaming application as a multi-process\u00a0application on the same server.\nWith prevalence of fast, multi-core systems, this mode is effective for\ndebugging, fine tuning, and generic analysis before submitting the job\nto a larger Hadoop cluster. In this mode, execution uses the Hadoop\nservices and hence is likely to identify issues that are related to the\nHadoop environment (such issues will not be uncovered in local mode).\nThe throughput will obviously not be as high as on a 
 multi-node Hadoop\ncluster. Additionally, since each container (i.e. Java process) requires\na significant amount of memory, you will be able to run a much smaller\nnumber of containers than on a multi-node cluster.  Multi-Node Cluster  In a multi-node Hadoop cluster all the services of Hadoop are\ntypically distributed across multiple nodes in a production or\nproduction-level test environment. Upon launch the application is\nsubmitted to the Hadoop cluster and executes as a  multi-processapplication on\u00a0multiple nodes.  Before you start deploying, testing and troubleshooting your\napplication on a cluster, you should ensure that Hadoop (version 2.2.0\nor later)\u00a0is properly installed and\nyou have basic skills for working with it.", 
+            "text": "In this section we discuss various Hadoop cluster setups.", 
             "title": "Hadoop Cluster"
         }, 
         {
+            "location": "/application_development/#single-node-cluster", 
+            "text": "In a single node Hadoop cluster all services are deployed on a\nsingle server (a developer can use his/her development machine as a\nsingle node cluster). The platform does not distinguish between a single\nor multi-node setup and behaves exactly the same in both cases.  In this mode, the resource manager, name node, data node, and node\nmanager occupy one process each. This is an example of running a\nstreaming application as a multi-process\u00a0application on the same server.\nWith prevalence of fast, multi-core systems, this mode is effective for\ndebugging, fine tuning, and generic analysis before submitting the job\nto a larger Hadoop cluster. In this mode, execution uses the Hadoop\nservices and hence is likely to identify issues that are related to the\nHadoop environment (such issues will not be uncovered in local mode).\nThe throughput will obviously not be as high as on a multi-node Hadoop\ncluster. Additionally, since each container (i.e. Java proces
 s) requires\na significant amount of memory, you will be able to run a much smaller\nnumber of containers than on a multi-node cluster.", 
+            "title": "Single Node Cluster"
+        }, 
+        {
+            "location": "/application_development/#multi-node-cluster", 
+            "text": "In a multi-node Hadoop cluster all the services of Hadoop are\ntypically distributed across multiple nodes in a production or\nproduction-level test environment. Upon launch the application is\nsubmitted to the Hadoop cluster and executes as a  multi-processapplication on\u00a0multiple nodes.  Before you start deploying, testing and troubleshooting your\napplication on a cluster, you should ensure that Hadoop (version 2.2.0\nor later)\u00a0is properly installed and\nyou have basic skills for working with it.", 
+            "title": "Multi-Node Cluster"
+        }, 
+        {
             "location": "/application_development/#apache-apex-platform-overview", 
             "text": "", 
             "title": "Apache Apex Platform Overview"
@@ -92,40 +102,190 @@
         }, 
         {
             "location": "/application_development/#hadoop-components", 
-            "text": "In this section we cover some aspects of Hadoop that your\nstreaming application interacts with. This section is not meant to\neducate the reader on Hadoop, but just get the reader acquainted with\nthe terms. We strongly advise readers to learn Hadoop from other\nsources.  A streaming application runs as a native Hadoop 2.2 application.\nHadoop 2.2 does not differentiate between a map-reduce job and other\napplications, and hence as far as Hadoop is concerned, the streaming\napplication is just another job. This means that your application\nleverages all the bells and whistles Hadoop provides and is fully\nsupported within Hadoop technology stack. The platform is responsible\nfor properly integrating itself with the relevant components of Hadoop\nthat exist today and those that may emerge in the future  All investments that leverage multi-tenancy (for example quotas\nand queues), security (for example kerberos), data flow integration (for\nexample copying data i
 n-out of HDFS), monitoring, metrics collections,\netc. will require no changes when streaming applications run on\nHadoop.  YARN  YARN is\nthe core library of Hadoop 2.2 that is tasked with resource management\nand works as a distributed application framework. In this section we\nwill walk through Yarn's components. In Hadoop 2.2, the old jobTracker\nhas been replaced by a combination of ResourceManager (RM) and\nApplicationMaster (AM).  Resource Manager (RM)  ResourceManager (RM)\nmanages all the distributed resources. It allocates and arbitrates all\nthe slots and the resources (cpu, memory, network) of these slots. It\nworks with per-node NodeManagers (NMs) and per-application\nApplicationMasters (AMs). Currently memory usage is monitored by RM; in\nupcoming releases it will have CPU as well as network management. RM is\nshared by map-reduce and streaming applications. Running streaming\napplications requires no changes in the RM.  Application Master (AM)  The AM is the watchdog 
 or monitoring process for your application\nand has the responsibility of negotiating resources with RM and\ninteracting with NodeManagers to get the allocated containers started.\nThe AM is the starting point of your application and is considered user\ncode (not system Hadoop code). The AM itself runs in one container. All\nresource management within the application are managed by the AM. This\nis a critical feature for Hadoop 2.2 where tasks done by jobTracker in\nHadoop 1.0 have been distributed allowing Hadoop 2.2 to scale much\nbeyond Hadoop 1.0. STRAM is a native YARN ApplicationManager.  Node Managers (NM)  There is one  NodeManager (NM)\nper node in the cluster. All the containers (i.e. processes) on that\nnode are monitored by the NM. It takes instructions from RM and manages\nresources of that node as per RM instructions. NMs interactions are same\nfor map-reduce and for streaming applications. Running streaming\napplications requires no changes in the NM.  RPC Protocol  C
 ommunication among RM, AM, and NM is done via the Hadoop RPC\nprotocol. Streaming applications use the same protocol to send their\ndata. No changes are needed in RPC support provided by Hadoop to enable\ncommunication done by components of your application.  HDFS  Hadoop includes a highly fault tolerant, high throughput\ndistributed file system ( HDFS ).\nIt runs on commodity hardware, and your streaming application will, by\ndefault, use it. There is no difference between files created by a\nstreaming application and those created by map-reduce.", 
+            "text": "In this section we cover some aspects of Hadoop that your\nstreaming application interacts with. This section is not meant to\neducate the reader on Hadoop, but just get the reader acquainted with\nthe terms. We strongly advise readers to learn Hadoop from other\nsources.  A streaming application runs as a native Hadoop 2.2 application.\nHadoop 2.2 does not differentiate between a map-reduce job and other\napplications, and hence as far as Hadoop is concerned, the streaming\napplication is just another job. This means that your application\nleverages all the bells and whistles Hadoop provides and is fully\nsupported within Hadoop technology stack. The platform is responsible\nfor properly integrating itself with the relevant components of Hadoop\nthat exist today and those that may emerge in the future  All investments that leverage multi-tenancy (for example quotas\nand queues), security (for example kerberos), data flow integration (for\nexample copying data i
 n-out of HDFS), monitoring, metrics collections,\netc. will require no changes when streaming applications run on\nHadoop.", 
             "title": "Hadoop Components"
         }, 
         {
+            "location": "/application_development/#yarn", 
+            "text": "YARN is\nthe core library of Hadoop 2.2 that is tasked with resource management\nand works as a distributed application framework. In this section we\nwill walk through Yarn's components. In Hadoop 2.2, the old jobTracker\nhas been replaced by a combination of ResourceManager (RM) and\nApplicationMaster (AM).", 
+            "title": "YARN"
+        }, 
+        {
+            "location": "/application_development/#resource-manager-rm", 
+            "text": "ResourceManager (RM)\nmanages all the distributed resources. It allocates and arbitrates all\nthe slots and the resources (cpu, memory, network) of these slots. It\nworks with per-node NodeManagers (NMs) and per-application\nApplicationMasters (AMs). Currently memory usage is monitored by RM; in\nupcoming releases it will have CPU as well as network management. RM is\nshared by map-reduce and streaming applications. Running streaming\napplications requires no changes in the RM.", 
+            "title": "Resource Manager (RM)"
+        }, 
+        {
+            "location": "/application_development/#application-master-am", 
+            "text": "The AM is the watchdog or monitoring process for your application\nand has the responsibility of negotiating resources with RM and\ninteracting with NodeManagers to get the allocated containers started.\nThe AM is the starting point of your application and is considered user\ncode (not system Hadoop code). The AM itself runs in one container. All\nresource management within the application are managed by the AM. This\nis a critical feature for Hadoop 2.2 where tasks done by jobTracker in\nHadoop 1.0 have been distributed allowing Hadoop 2.2 to scale much\nbeyond Hadoop 1.0. STRAM is a native YARN ApplicationManager.", 
+            "title": "Application Master (AM)"
+        }, 
+        {
+            "location": "/application_development/#node-managers-nm", 
+            "text": "There is one  NodeManager (NM)\nper node in the cluster. All the containers (i.e. processes) on that\nnode are monitored by the NM. It takes instructions from RM and manages\nresources of that node as per RM instructions. NMs interactions are same\nfor map-reduce and for streaming applications. Running streaming\napplications requires no changes in the NM.", 
+            "title": "Node Managers (NM)"
+        }, 
+        {
+            "location": "/application_development/#rpc-protocol", 
+            "text": "Communication among RM, AM, and NM is done via the Hadoop RPC\nprotocol. Streaming applications use the same protocol to send their\ndata. No changes are needed in RPC support provided by Hadoop to enable\ncommunication done by components of your application.", 
+            "title": "RPC Protocol"
+        }, 
+        {
+            "location": "/application_development/#hdfs", 
+            "text": "Hadoop includes a highly fault tolerant, high throughput\ndistributed file system ( HDFS ).\nIt runs on commodity hardware, and your streaming application will, by\ndefault, use it. There is no difference between files created by a\nstreaming application and those created by map-reduce.", 
+            "title": "HDFS"
+        }, 
+        {
             "location": "/application_development/#developing-an-application", 
             "text": "In this chapter we describe the methodology to develop an\napplication using the Realtime Streaming Platform. The platform was\ndesigned to make it easy to build and launch sophisticated streaming\napplications with the developer having to deal only with the\napplication/business logic. The platform deals with details of where to\nrun what operators on which servers and how to correctly route streams\nof data among them.", 
             "title": "Developing An Application"
         }, 
         {
             "location": "/application_development/#development-process", 
-            "text": "While the platform does not mandate a specific methodology or set\nof development tools, we have recommendations to maximize productivity\nfor the different phases of application development.  Design   Identify common, reusable operators. Use a library\n    if possible.  Identify scalability and performance requirements before\n    designing the DAG.  Leverage attributes that the platform supports for scalability\n    and performance.  Use operators that are benchmarked and tested so that later\n    surprises are minimized. If you have glue code, create appropriate\n    unit tests for it.  Use THREAD_LOCAL locality for high throughput streams. If all\n    the operators on that stream cannot fit in one container,\n    try\u00a0NODE_LOCAL\u00a0locality. Both THREAD_LOCAL and\n    NODE_LOCAL streams avoid the Network Interface Card (NIC)\n    completly. The former uses intra-process communication to also avoid\n    serialization-deserialization overhead.  The overa
 ll throughput and latencies are are not necessarily\n    correlated to the number of operators in a simple way -- the\n    relationship is more nuanced. A lot depends on how much work\n    individual operators are doing, how many are able to operate in\n    parallel, and how much data is flowing through the arcs of the DAG.\n    It is, at times, better to break a computation down into its\n    constituent simple parts and then stitch them together via streams\n    to better utilize the compute resources of the cluster. Decide on a\n    per application basis the fine line between complexity of each\n    operator vs too many streams. Doing multiple computations in one\n    operator does save network I/O, while operators that are too complex\n    are hard to maintain.  Do not use operators that depend on the order of two streams\n    as far as possible. In such cases behavior is not idempotent.  Persist key information to HDFS if possible; it may be useful\n    for debugging later.  De
 cide on an appropriate fault tolerance mechanism. If some\n    data loss is acceptable, use the at-most-once mechanism as it has\n    fastest recovery.   Creating New Project  Please refer to the  Apex Application Packages \u00a0for\nthe basic steps for creating a new project.  Writing the application code  Preferably use an IDE (Eclipse, Netbeans etc.) that allows you to\nmanage dependencies and assists with the Java coding. Specific benefits\ninclude ease of managing operator library jar files, individual operator\nclasses, ports and properties. It will also highlight and assist to\nrectify issues such as type mismatches when adding streams while\ntyping.  Testing  Write test cases with JUnit or similar test framework so that code\nis tested as it is written. For such testing, the DAG can run in local\nmode within the IDE. Doing this may involve writing mock input or output\noperators for the integration points with external systems. For example,\ninstead of reading from a live da
 ta stream, the application in test mode\ncan read from and write to files. This can be done with a single\napplication DAG by instrumenting a test mode using settings in the\nconfiguration that is passed to the application factory\ninterface.  Good test coverage will not only eliminate basic validation errors\nsuch as missing port connections or property constraint violations, but\nalso validate the correct processing of the data. The same tests can be\nre-run whenever the application or its dependencies change (operator\nlibraries, version of the platform etc.)  Running an application  The platform provides a commandline tool called dtcli\u00a0for managing applications (launching,\nkilling, viewing, etc.). This tool was already discussed above briefly\nin the section entitled Running the Test Application. It will introspect\nthe jar file specified with the launch command for applications (classes\nthat implement ApplicationFactory) or property files that define\napplications. It wi
 ll also deploy the dependency jar files from the\napplication package to the cluster.  Dtcli can run the application in local mode (i.e. outside a\ncluster). It is recommended to first run the application in local mode\nin the development environment before launching on the Hadoop cluster.\nThis way some of the external system integration and correct\nfunctionality of the application can be verified in an easier to debug\nenvironment before testing distributed mode.  For more details on CLI please refer to the  dtCli Guide .", 
+            "text": "While the platform does not mandate a specific methodology or set\nof development tools, we have recommendations to maximize productivity\nfor the different phases of application development.", 
             "title": "Development Process"
         }, 
         {
+            "location": "/application_development/#design", 
+            "text": "Identify common, reusable operators. Use a library\n    if possible.  Identify scalability and performance requirements before\n    designing the DAG.  Leverage attributes that the platform supports for scalability\n    and performance.  Use operators that are benchmarked and tested so that later\n    surprises are minimized. If you have glue code, create appropriate\n    unit tests for it.  Use THREAD_LOCAL locality for high throughput streams. If all\n    the operators on that stream cannot fit in one container,\n    try\u00a0NODE_LOCAL\u00a0locality. Both THREAD_LOCAL and\n    NODE_LOCAL streams avoid the Network Interface Card (NIC)\n    completly. The former uses intra-process communication to also avoid\n    serialization-deserialization overhead.  The overall throughput and latencies are are not necessarily\n    correlated to the number of operators in a simple way -- the\n    relationship is more nuanced. A lot depends on how much work\n    individual op
 erators are doing, how many are able to operate in\n    parallel, and how much data is flowing through the arcs of the DAG.\n    It is, at times, better to break a computation down into its\n    constituent simple parts and then stitch them together via streams\n    to better utilize the compute resources of the cluster. Decide on a\n    per application basis the fine line between complexity of each\n    operator vs too many streams. Doing multiple computations in one\n    operator does save network I/O, while operators that are too complex\n    are hard to maintain.  Do not use operators that depend on the order of two streams\n    as far as possible. In such cases behavior is not idempotent.  Persist key information to HDFS if possible; it may be useful\n    for debugging later.  Decide on an appropriate fault tolerance mechanism. If some\n    data loss is acceptable, use the at-most-once mechanism as it has\n    fastest recovery.", 
+            "title": "Design"
+        }, 
+        {
+            "location": "/application_development/#creating-new-project", 
+            "text": "Please refer to the  Apex Application Packages \u00a0for\nthe basic steps for creating a new project.", 
+            "title": "Creating New Project"
+        }, 
+        {
+            "location": "/application_development/#writing-the-application-code", 
+            "text": "Preferably use an IDE (Eclipse, Netbeans etc.) that allows you to\nmanage dependencies and assists with the Java coding. Specific benefits\ninclude ease of managing operator library jar files, individual operator\nclasses, ports and properties. It will also highlight and assist to\nrectify issues such as type mismatches when adding streams while\ntyping.", 
+            "title": "Writing the application code"
+        }, 
+        {
+            "location": "/application_development/#testing", 
+            "text": "Write test cases with JUnit or similar test framework so that code\nis tested as it is written. For such testing, the DAG can run in local\nmode within the IDE. Doing this may involve writing mock input or output\noperators for the integration points with external systems. For example,\ninstead of reading from a live data stream, the application in test mode\ncan read from and write to files. This can be done with a single\napplication DAG by instrumenting a test mode using settings in the\nconfiguration that is passed to the application factory\ninterface.  Good test coverage will not only eliminate basic validation errors\nsuch as missing port connections or property constraint violations, but\nalso validate the correct processing of the data. The same tests can be\nre-run whenever the application or its dependencies change (operator\nlibraries, version of the platform etc.)", 
+            "title": "Testing"
+        }, 
+        {
+            "location": "/application_development/#running-an-application", 
+            "text": "The platform provides a commandline tool called dtcli\u00a0for managing applications (launching,\nkilling, viewing, etc.). This tool was already discussed above briefly\nin the section entitled Running the Test Application. It will introspect\nthe jar file specified with the launch command for applications (classes\nthat implement ApplicationFactory) or property files that define\napplications. It will also deploy the dependency jar files from the\napplication package to the cluster.  Dtcli can run the application in local mode (i.e. outside a\ncluster). It is recommended to first run the application in local mode\nin the development environment before launching on the Hadoop cluster.\nThis way some of the external system integration and correct\nfunctionality of the application can be verified in an easier to debug\nenvironment before testing distributed mode.  For more details on CLI please refer to the  dtCli Guide .", 
+            "title": "Running an application"
+        }, 
+        {
             "location": "/application_development/#application-api", 
-            "text": "This section introduces the API to write a streaming application.\nThe work involves connecting operators via streams to form the logical\nDAG. The steps are    Instantiate an application (DAG)    (Optional) Set Attributes   Assign application name  Set any other attributes as per application requirements     Create/re-use and instantiate operators   Assign operator name that is unique within the  application  Declare schema upfront for each operator (and thereby its ports)  (Optional) Set properties\u00a0 and attributes on the dag as per specification  Connect ports of operators via streams  Each stream connects one output port of an operator to one or  more input ports of other operators.  (Optional) Set attributes on the streams       Test the application.    There are two methods to create an application, namely Java, and\nProperties file. Java API is for applications being developed by humans,\nand properties file (Hadoop like) is more suited for DAGs gener
 ated by\ntools.  Java API  The Java API is the most common way to create a streaming\napplication. It is meant for application developers who prefer to\nleverage the features of Java, and the ease of use and enhanced\nproductivity provided by IDEs like NetBeans or Eclipse. Using Java to\nspecify the application provides extra validation abilities of Java\ncompiler, such as compile time checks for type safety at the time of\nwriting the code. Later in this chapter you can read more about\nvalidation support in the platform.  The developer specifies the streaming application by implementing\nthe ApplicationFactory interface, which is how platform tools (CLI etc.)\nrecognize and instantiate applications. Here we show how to create a\nYahoo! Finance application that streams the last trade price of a ticker\nand computes the high and low price in every 1 min window. Run above\n test application\u00a0to execute the\nDAG in local mode within the IDE.  Let us revisit how the Yahoo! Finance 
 test application constructs the DAG:  public class Application implements StreamingApplication\n{\n\n  ...\n\n  @Override\n  public void populateDAG(DAG dag, Configuration conf)\n  {\n    dag.getAttributes().attr(DAG.STRAM_WINDOW_SIZE_MILLIS).set(streamingWindowSizeMilliSeconds);\n\n    StockTickInput tick = getStockTickInputOperator( StockTickInput , dag);\n    SumKeyVal String, Long  dailyVolume = getDailyVolumeOperator( DailyVolume , dag);\n    ConsolidatorKeyVal String,Double,Long,String,?,?  quoteOperator = getQuoteOperator( Quote , dag);\n\n    RangeKeyVal String, Double  highlow = getHighLowOperator( HighLow , dag, appWindowCountMinute);\n    SumKeyVal String, Long  minuteVolume = getMinuteVolumeOperator( MinuteVolume , dag, appWindowCountMinute);\n    ConsolidatorKeyVal String,HighLow,Long,?,?,?  chartOperator = getChartOperator( Chart , dag);\n\n    SimpleMovingAverage String, Double  priceSMA = getPriceSimpleMovingAverageOperator( PriceSMA , dag, appWindowCountSMA);\n\n   
  dag.addStream( price , tick.price, quoteOperator.in1, highlow.data, priceSMA.data);\n    dag.addStream( vol , tick.volume, dailyVolume.data, minuteVolume.data);\n    dag.addStream( time , tick.time, quoteOperator.in3);\n    dag.addStream( daily_vol , dailyVolume.sum, quoteOperator.in2);\n\n    dag.addStream( quote_data , quoteOperator.out, getConsole( quoteConsole , dag,  QUOTE ));\n\n    dag.addStream( high_low , highlow.range, chartOperator.in1);\n    dag.addStream( vol_1min , minuteVolume.sum, chartOperator.in2);\n    dag.addStream( chart_data , chartOperator.out, getConsole( chartConsole , dag,  CHART ));\n\n    dag.addStream( sma_price , priceSMA.doubleSMA, getConsole( priceSMAConsole , dag,  Price SMA ));\n\n    return dag;\n  }\n}  Property File API  The platform also supports specification of a DAG via a property\nfile. The aim here to make it easy for tools to create and run an\napplication. This method of specification does not have the Java\ncompiler support of compile t
 ime check, but since these applications\nwould be created by software, they should be correct by construction.\nThe syntax is derived from Hadoop properties and should be easy for\nfolks who are used to creating software that integrated with\nHadoop.  Create an application (DAG): myApplication.properties  # input operator that reads from a file\ndt.operator.inputOp.classname=com.acme.SampleInputOperator\ndt.operator.inputOp.fileName=somefile.txt\n\n# output operator that writes to the console\ndt.operator.outputOp.classname=com.acme.ConsoleOutputOperator\n\n# stream connecting both operators\ndt.stream.inputStream.source=inputOp.outputPort\ndt.stream.inputStream.sinks=outputOp.inputPort  Above snippet is intended to convey the basic idea of specifying\nthe DAG without using Java. Operators would come from a predefined\nlibrary and referenced in the specification by class name and port names\n(obtained from the library providers documentation or runtime\nintrospection by tools). For 
 those interested in details, see later\nsections and refer to the  Operation and\nInstallation Guide\u00a0mentioned above.  Attributes  Attributes impact the runtime behavior of the application. They do\nnot impact the functionality. An example of an attribute is application\nname. Setting it changes the application name. Another example is\nstreaming window size. Setting it changes the streaming window size from\nthe default value to the specified value. Users cannot add new\nattributes, they can only choose from the ones that come packaged and\npre-supported by the platform. Details of attributes are covered in the\n Operation and Installation\nGuide.", 
+            "text": "This section introduces the API to write a streaming application.\nThe work involves connecting operators via streams to form the logical\nDAG. The steps are    Instantiate an application (DAG)    (Optional) Set Attributes   Assign application name  Set any other attributes as per application requirements     Create/re-use and instantiate operators   Assign operator name that is unique within the  application  Declare schema upfront for each operator (and thereby its ports)  (Optional) Set properties\u00a0 and attributes on the dag as per specification  Connect ports of operators via streams  Each stream connects one output port of an operator to one or  more input ports of other operators.  (Optional) Set attributes on the streams       Test the application.    There are two methods to create an application, namely Java, and\nProperties file. Java API is for applications being developed by humans,\nand properties file (Hadoop like) is more suited for DAGs gener
 ated by\ntools.", 
             "title": "Application API"
         }, 
         {
+            "location": "/application_development/#java-api", 
+            "text": "The Java API is the most common way to create a streaming\napplication. It is meant for application developers who prefer to\nleverage the features of Java, and the ease of use and enhanced\nproductivity provided by IDEs like NetBeans or Eclipse. Using Java to\nspecify the application provides extra validation abilities of Java\ncompiler, such as compile time checks for type safety at the time of\nwriting the code. Later in this chapter you can read more about\nvalidation support in the platform.  The developer specifies the streaming application by implementing\nthe ApplicationFactory interface, which is how platform tools (CLI etc.)\nrecognize and instantiate applications. Here we show how to create a\nYahoo! Finance application that streams the last trade price of a ticker\nand computes the high and low price in every 1 min window. Run above\n test application\u00a0to execute the\nDAG in local mode within the IDE.  Let us revisit how the Yahoo! Finance test a
 pplication constructs the DAG:  public class Application implements StreamingApplication\n{\n\n  ...\n\n  @Override\n  public void populateDAG(DAG dag, Configuration conf)\n  {\n    dag.getAttributes().attr(DAG.STRAM_WINDOW_SIZE_MILLIS).set(streamingWindowSizeMilliSeconds);\n\n    StockTickInput tick = getStockTickInputOperator( StockTickInput , dag);\n    SumKeyVal String, Long  dailyVolume = getDailyVolumeOperator( DailyVolume , dag);\n    ConsolidatorKeyVal String,Double,Long,String,?,?  quoteOperator = getQuoteOperator( Quote , dag);\n\n    RangeKeyVal String, Double  highlow = getHighLowOperator( HighLow , dag, appWindowCountMinute);\n    SumKeyVal String, Long  minuteVolume = getMinuteVolumeOperator( MinuteVolume , dag, appWindowCountMinute);\n    ConsolidatorKeyVal String,HighLow,Long,?,?,?  chartOperator = getChartOperator( Chart , dag);\n\n    SimpleMovingAverage String, Double  priceSMA = getPriceSimpleMovingAverageOperator( PriceSMA , dag, appWindowCountSMA);\n\n    dag.a
 ddStream( price , tick.price, quoteOperator.in1, highlow.data, priceSMA.data);\n    dag.addStream( vol , tick.volume, dailyVolume.data, minuteVolume.data);\n    dag.addStream( time , tick.time, quoteOperator.in3);\n    dag.addStream( daily_vol , dailyVolume.sum, quoteOperator.in2);\n\n    dag.addStream( quote_data , quoteOperator.out, getConsole( quoteConsole , dag,  QUOTE ));\n\n    dag.addStream( high_low , highlow.range, chartOperator.in1);\n    dag.addStream( vol_1min , minuteVolume.sum, chartOperator.in2);\n    dag.addStream( chart_data , chartOperator.out, getConsole( chartConsole , dag,  CHART ));\n\n    dag.addStream( sma_price , priceSMA.doubleSMA, getConsole( priceSMAConsole , dag,  Price SMA ));\n\n    return dag;\n  }\n}", 
+            "title": "Java API"
+        }, 
+        {
+            "location": "/application_development/#property-file-api", 
+            "text": "The platform also supports specification of a DAG via a property\nfile. The aim here to make it easy for tools to create and run an\napplication. This method of specification does not have the Java\ncompiler support of compile time check, but since these applications\nwould be created by software, they should be correct by construction.\nThe syntax is derived from Hadoop properties and should be easy for\nfolks who are used to creating software that integrated with\nHadoop.  Create an application (DAG): myApplication.properties  # input operator that reads from a file\ndt.operator.inputOp.classname=com.acme.SampleInputOperator\ndt.operator.inputOp.fileName=somefile.txt\n\n# output operator that writes to the console\ndt.operator.outputOp.classname=com.acme.ConsoleOutputOperator\n\n# stream connecting both operators\ndt.stream.inputStream.source=inputOp.outputPort\ndt.stream.inputStream.sinks=outputOp.inputPort  Above snippet is intended to convey the basic idea 
 of specifying\nthe DAG without using Java. Operators would come from a predefined\nlibrary and referenced in the specification by class name and port names\n(obtained from the library providers documentation or runtime\nintrospection by tools). For those interested in details, see later\nsections and refer to the  Operation and\nInstallation Guide\u00a0mentioned above.", 
+            "title": "Property File API"
+        }, 
+        {
+            "location": "/application_development/#attributes", 
+            "text": "Attributes impact the runtime behavior of the application. They do\nnot impact the functionality. An example of an attribute is application\nname. Setting it changes the application name. Another example is\nstreaming window size. Setting it changes the streaming window size from\nthe default value to the specified value. Users cannot add new\nattributes, they can only choose from the ones that come packaged and\npre-supported by the platform. Details of attributes are covered in the\n Operation and Installation\nGuide.", 
+            "title": "Attributes"
+        }, 
+        {
             "location": "/application_development/#operators", 
-            "text": "Operators\u00a0are basic compute units.\nOperators process each incoming tuple and emit zero or more tuples on\noutput ports as per the business logic. The data flow, connectivity,\nfault tolerance (node outage), etc. is taken care of by the platform. As\nan operator developer, all that is needed is to figure out what to do\nwith the incoming tuple and when (and which output port) to send out a\nparticular output tuple. Correctly designed operators will most likely\nget reused. Operator design needs care and foresight. For details, refer\nto the   Operator Developer Guide . As an application developer you need to connect operators\nin a way that it implements your business logic. You may also require\noperator customization for functionality and use attributes for\nperformance/scalability etc.  All operators process tuples asynchronously in a distributed\ncluster. An operator cannot assume or predict the exact time a tuple\nthat it emitted will get consumed by a
  downstream operator. An operator\nalso cannot predict the exact time when a tuple arrives from an upstream\noperator. The only guarantee is that the upstream operators are\nprocessing the current or a future window, i.e. the windowId of upstream\noperator is equals or exceeds its own windowId. Conversely the windowId\nof a downstream operator is less than or equals its own windowId. The\nend of a window operation, i.e. the API call to endWindow on an operator\nrequires that all upstream operators have finished processing this\nwindow. This means that completion of processing a window propagates in\na blocking fashion through an operator. Later sections provides more\ndetails on streams and data flow of tuples.  Each operator has a unique name within the DAG as provided by the\nuser. This is the name of the operator in the logical plan. The name of\nthe operator in the physical plan is an integer assigned to it by STRAM.\nThese integers are use the sequence from 1 to N, where N is t
 otal number\nof physically unique operators in the DAG. \u00a0Following the same rule,\neach partitioned instance of a logical operator has its own integer as\nan id. This id along with the Hadoop container name uniquely identifies\nthe operator in the execution plan of the DAG. The logical names and the\nphysical names are required for web service support. Operators can be\naccessed via both names. These same names are used while interacting\nwith  dtcli\u00a0to access an operator.\nIdeally these names should be self-descriptive. For example in Figure 1,\nthe node named \u201cDaily volume\u201d has a physical identifier of 2.  Operator Interface  Operator interface in a DAG consists of ports,\u00a0properties,\u00a0and attributes.\nOperators interact with other components of the DAG via ports. Functional behavior of the operators\ncan be customized via parameters. Run time performance and physical\ninstantiation is controlled by attributes. Ports and parameters are\nfields (variable
 s) of the Operator class/object, while attributes are\nmeta information that is attached to the operator object via an\nAttributeMap. An operator must have at least one port. Properties are\noptional. Attributes are provided by the platform and always have a\ndefault value that enables normal functioning of operators.  Ports  Ports are connection points by which an operator receives and\nemits tuples. These should be transient objects instantiated in the\noperator object, that implement particular interfaces. Ports should be\ntransient as they contain no state. They have a pre-defined schema and\ncan only be connected to other ports with the same schema. An input port\nneeds to implement the interface  Operator.InputPort\u00a0and\ninterface Sink. A default\nimplementation of these is provided by the abstract class DefaultInputPort. An output port needs to\nimplement the interface  Operator.OutputPort. A default implementation\nof this is provided by the concrete class DefaultOutputP
 ort. These two are a quick way to\nimplement the above interfaces, but operator developers have the option\nof providing their own implementations.  Here are examples of an input and an output port from the operator\nSum.  @InputPortFieldAnnotation(name =  data )\npublic final transient DefaultInputPort V  data = new DefaultInputPort V () {\n  @Override\n  public void process(V tuple)\n  {\n    ...\n  }\n}\n@OutputPortFieldAnnotation(optional=true)\npublic final transient DefaultOutputPort V  sum = new DefaultOutputPort V (){ \u2026 };  The process call is in the Sink interface. An emit on an output\nport is done via emit(tuple) call. For the above example it would be\nsum.emit(t), where the type of t is the generic parameter V.  There is no limit on how many ports an operator can have. However\nany operator must have at least one port. An operator with only one port\nis called an Input Adapter if it has no input port and an Output Adapter\nif it has no output port. These are specia
 l operators needed to get/read\ndata from outside system/source into the application, or push/write data\ninto an outside system/sink. These could be in Hadoop or outside of\nHadoop. These two operators are in essence gateways for the streaming\napplication to communicate with systems outside the application.  Port connectivity can be validated during compile time by adding\nPortFieldAnnotations shown above. By default all ports have to be\nconnected, to allow a port to go unconnected, you need to add\n\u201coptional=true\u201d to the annotation.  Attributes can be specified for ports that affect the runtime\nbehavior. An example of an attribute is parallel partition that specifes\na parallel computation flow per partition. It is described in detail in\nthe Parallel Partitions section. Another example is queue capacity that specifies the buffer size for the\nport. Details of attributes are covered in  Operation and Installation Guide.  Properties  Properties are the abstractions by 
 which functional behavior of an\noperator can be customized. They should be non-transient objects\ninstantiated in the operator object. They need to be non-transient since\nthey are part of the operator state and re-construction of the operator\nobject from its checkpointed state must restore the operator to the\ndesired state. Properties are optional, i.e. an operator may or may not\nhave properties; they are part of user code and their values are not\ninterpreted by the platform in any way.  All non-serializable objects should be declared transient.\nExamples include sockets, session information, etc. These objects should\nbe initialized during setup call, which is called every time the\noperator is initialized.  Attributes  Attributes are values assigned to the operators that impact\nrun-time. This includes things like the number of partitions, at most\nonce or at least once or exactly once recovery modes, etc. Attributes do\nnot impact functionality of the operator. Users can ch
 ange certain\nattributes in runtime. Users cannot add attributes to operators; they\nare pre-defined by the platform. They are interpreted by the platform\nand thus cannot be defined in user created code (like properties).\nDetails of attributes are covered in   Configuration Guide .  Operator State  The state of an operator is defined as the data that it transfers\nfrom one window to a future window. Since the computing model of the\nplatform is to treat windows like micro-batches, the operator state can\nbe checkpointed every Nth window, or every T units of time, where T is significantly greater\nthan the streaming window. \u00a0When an operator is checkpointed, the entire\nobject is written to HDFS. \u00a0The larger the amount of state in an\noperator, the longer it takes to recover from a failure. A stateless\noperator can recover much quicker than a stateful one. The needed\nwindows are preserved by the upstream buffer server and are used to\nrecompute the lost windows, and als
 o rebuild the buffer server in the\ncurrent container.  The distinction between Stateless and Stateful is based solely on\nthe need to transfer data in the operator from one window to the next.\nThe state of an operator is independent of the number of ports.  Stateless  A Stateless operator is defined as one where no data is needed to\nbe kept at the end of every window. This means that all the computations\nof a window can be derived from all the tuples the operator receives\nwithin that window. This guarantees that the output of any window can be\nreconstructed by simply replaying the tuples that arrived in that\nwindow. Stateless operators are more efficient in terms of fault\ntolerance, and cost to achieve SLA.  Stateful  A Stateful operator is defined as one where data is needed to be\nstored at the end of a window for computations occurring in later\nwindow; a common example is the computation of a sum of values in the\ninput tuples.  Operator API  The Operator API consists of
  methods that operator developers may\nneed to override. In this section we will discuss the Operator APIs from\nthe point of view of an application developer. Knowledge of how an\noperator works internally is critical for writing an application. Those\ninterested in the details should refer to  Malhar Operator Developer Guide.  The APIs are available in three modes, namely Single Streaming\nWindow, Sliding Application Window, and Aggregate Application Window.\nThese are not mutually exclusive, i.e. an operator can use single\nstreaming window as well as sliding application window. A physical\ninstance of an operator is always processing tuples from a single\nwindow. The processing of tuples is guaranteed to be sequential, no\nmatter which input port the tuples arrive on.  In the later part of this section we will evaluate three common\nuses of streaming windows by applications. They have different\ncharacteristics and implications on optimization and recovery mechanisms\n(i.e. algo
 rithm used to recover a node after outage) as discussed later\nin the section.  Streaming Window  Streaming window is atomic micro-batch computation period. The API\nmethods relating to a streaming window are as follows  public void process( tuple_type  tuple) // Called on the input port on which the tuple arrives\npublic void beginWindow(long windowId) // Called at the start of the window as soon as the first begin_window tuple arrives\npublic void endWindow() // Called at the end of the window after end_window tuples arrive on all input ports\npublic void setup(OperatorContext context) // Called once during initialization of the operator\npublic void teardown() // Called once when the operator is being shutdown  A tuple can be emitted in any of the three streaming run-time\ncalls, namely beginWindow, process, and endWindow but not in setup or\nteardown.  Aggregate Application Window  An operator with an aggregate window is stateful within the\napplication window timeframe and poss
 ibly stateless at the end of that\napplication window. An size of an aggregate application window is an\noperator attribute and is defined as a multiple of the streaming window\nsize. The platform recognizes this attribute and optimizes the operator.\nThe beginWindow, and endWindow calls are not invoked for those streaming\nwindows that do not align with the application window. For example in\ncase of streaming window of 0.5 second and application window of 5\nminute, an application window spans 600 streaming windows (5*60*2 =\n600). At the start of the sequence of these 600 atomic streaming\nwindows, a beginWindow gets invoked, and at the end of these 600\nstreaming windows an endWindow gets invoked. All the intermediate\nstreaming windows do not invoke beginWindow or endWindow. Bookkeeping,\nnode recovery, stats, UI, etc. continue to work off streaming windows.\nFor example if operators are being checkpointed say on an average every\n30th window, then the above application window 
 would have about 20\ncheckpoints.  Sliding Application Window  A sliding window is computations that requires previous N\nstreaming windows. After each streaming window the Nth past window is\ndropped and the new window is added to the computation. An operator with\nsliding window is a stateful operator at end of any window. The sliding\nwindow period is an attribute and is a multiple of streaming window. The\nplatform recognizes this attribute and leverages it during bookkeeping.\nA sliding aggregate window with tolerance to data loss does not have a\nvery high bookkeeping cost. The cost of all three recovery mechanisms,\n at most once\u00a0(data loss tolerant),\nat least once\u00a0(data loss\nintolerant), and exactly once\u00a0(data\nloss intolerant and no extra computations) is same as recovery\nmechanisms based on streaming window. STRAM is not able to leverage this\noperator for any extra optimization.  Single vs Multi-Input Operator  A single-input operator by definition has a
  single upstream\noperator, since there can only be one writing port for a stream. \u00a0If an\noperator has a single upstream operator, then the beginWindow on the\nupstream also blocks the beginWindow of the single-input operator. For\nan operator to start processing any window at least one upstream\noperator has to start processing that window. A multi-input operator\nreads from more than one upstream ports. Such an operator would start\nprocessing as soon as the first begin_window event arrives. However the\nwindow would not close (i.e. invoke endWindow) till all ports receive\nend_window events for that windowId. Thus the end of a window is a\nblocking event. As we saw earlier, a multi-input operator is also the\npoint in the DAG where windows of all upstream operators are\nsynchronized. The windows (atomic micro-batches) from a faster (or just\nahead in processing) upstream operators are queued up till the slower\nupstream operator catches up. STRAM monitors such bottlenecks a
 nd takes\ncorrective actions. The platform ensures minimal delay, i.e processing\nstarts as long as at least one upstream operator has started\nprocessing.  Recovery Mechanisms  Application developers can set any of the recovery mechanisms\nbelow to deal with node outage. In general, the cost of recovery depends\non the state of the operator, while data integrity is dependant on the\napplication. The mechanisms are per window as the platform treats\nwindows as atomic compute units. Three recovery mechanisms are\nsupported, namely   At-least-once: All atomic batches are processed at least once.\n    No data loss occurs.  At-most-once: All atomic batches are processed at most once.\n    Data loss is possible; this is the most efficient setting.  Exactly-once: All atomic batches are processed exactly once.\n    No data loss occurs; this is the least efficient setting since\n    additional work is needed to ensure proper semantics.   At-least-once is the default. During a recovery event
 , the\noperator connects to the upstream buffer server and asks for windows to\nbe replayed. At-least-once and exactly-once mechanisms start from its\ncheckpointed state. At-most-once starts from the next begin-window\nevent.  Recovery mechanisms can be specified per Operator while writing\nthe application as shown below.  Operator o = dag.addOperator(\u201coperator\u201d, \u2026);\ndag.setAttribute(o,  OperatorContext.PROCESSING_MODE,  ProcessingMode.AT_MOST_ONCE);  Also note that once an operator is attributed to AT_MOST_ONCE,\nall the operators downstream to it have to be AT_MOST_ONCE. The client\nwill give appropriate warnings or errors if that\u2019s not the case.  Details are explained in the chapter on Fault Tolerance below.", 
+            "text": "Operators\u00a0are basic compute units.\nOperators process each incoming tuple and emit zero or more tuples on\noutput ports as per the business logic. The data flow, connectivity,\nfault tolerance (node outage), etc. is taken care of by the platform. As\nan operator developer, all that is needed is to figure out what to do\nwith the incoming tuple and when (and which output port) to send out a\nparticular output tuple. Correctly designed operators will most likely\nget reused. Operator design needs care and foresight. For details, refer\nto the   Operator Developer Guide . As an application developer you need to connect operators\nin a way that it implements your business logic. You may also require\noperator customization for functionality and use attributes for\nperformance/scalability etc.  All operators process tuples asynchronously in a distributed\ncluster. An operator cannot assume or predict the exact time a tuple\nthat it emitted will get consumed by a
  downstream operator. An operator\nalso cannot predict the exact time when a tuple arrives from an upstream\noperator. The only guarantee is that the upstream operators are\nprocessing the current or a future window, i.e. the windowId of upstream\noperator is equals or exceeds its own windowId. Conversely the windowId\nof a downstream operator is less than or equals its own windowId. The\nend of a window operation, i.e. the API call to endWindow on an operator\nrequires that all upstream operators have finished processing this\nwindow. This means that completion of processing a window propagates in\na blocking fashion through an operator. Later sections provides more\ndetails on streams and data flow of tuples.  Each operator has a unique name within the DAG as provided by the\nuser. This is the name of the operator in the logical plan. The name of\nthe operator in the physical plan is an integer assigned to it by STRAM.\nThese integers are use the sequence from 1 to N, where N is t
 otal number\nof physically unique operators in the DAG. \u00a0Following the same rule,\neach partitioned instance of a logical operator has its own integer as\nan id. This id along with the Hadoop container name uniquely identifies\nthe operator in the execution plan of the DAG. The logical names and the\nphysical names are required for web service support. Operators can be\naccessed via both names. These same names are used while interacting\nwith  dtcli\u00a0to access an operator.\nIdeally these names should be self-descriptive. For example in Figure 1,\nthe node named \u201cDaily volume\u201d has a physical identifier of 2.", 
             "title": "Operators"
         }, 
         {
+            "location": "/application_development/#operator-interface", 
+            "text": "Operator interface in a DAG consists of ports,\u00a0properties,\u00a0and attributes.\nOperators interact with other components of the DAG via ports. Functional behavior of the operators\ncan be customized via parameters. Run time performance and physical\ninstantiation is controlled by attributes. Ports and parameters are\nfields (variables) of the Operator class/object, while attributes are\nmeta information that is attached to the operator object via an\nAttributeMap. An operator must have at least one port. Properties are\noptional. Attributes are provided by the platform and always have a\ndefault value that enables normal functioning of operators.", 
+            "title": "Operator Interface"
+        }, 
+        {
+            "location": "/application_development/#ports", 
+            "text": "Ports are connection points by which an operator receives and\nemits tuples. These should be transient objects instantiated in the\noperator object, that implement particular interfaces. Ports should be\ntransient as they contain no state. They have a pre-defined schema and\ncan only be connected to other ports with the same schema. An input port\nneeds to implement the interface  Operator.InputPort\u00a0and\ninterface Sink. A default\nimplementation of these is provided by the abstract class DefaultInputPort. An output port needs to\nimplement the interface  Operator.OutputPort. A default implementation\nof this is provided by the concrete class DefaultOutputPort. These two are a quick way to\nimplement the above interfaces, but operator developers have the option\nof providing their own implementations.  Here are examples of an input and an output port from the operator\nSum.  @InputPortFieldAnnotation(name =  data )\npublic final transient DefaultInputPort V 
  data = new DefaultInputPort V () {\n  @Override\n  public void process(V tuple)\n  {\n    ...\n  }\n}\n@OutputPortFieldAnnotation(optional=true)\npublic final transient DefaultOutputPort V  sum = new DefaultOutputPort V (){ \u2026 };  The process call is in the Sink interface. An emit on an output\nport is done via emit(tuple) call. For the above example it would be\nsum.emit(t), where the type of t is the generic parameter V.  There is no limit on how many ports an operator can have. However\nany operator must have at least one port. An operator with only one port\nis called an Input Adapter if it has no input port and an Output Adapter\nif it has no output port. These are special operators needed to get/read\ndata from outside system/source into the application, or push/write data\ninto an outside system/sink. These could be in Hadoop or outside of\nHadoop. These two operators are in essence gateways for the streaming\napplication to communicate with systems outside the applicati
 on.  Port connectivity can be validated during compile time by adding\nPortFieldAnnotations shown above. By default all ports have to be\nconnected, to allow a port to go unconnected, you need to add\n\u201coptional=true\u201d to the annotation.  Attributes can be specified for ports that affect the runtime\nbehavior. An example of an attribute is parallel partition that specifes\na parallel computation flow per partition. It is described in detail in\nthe Parallel Partitions section. Another example is queue capacity that specifies the buffer size for the\nport. Details of attributes are covered in  Operation and Installation Guide.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/application_development/#properties", 
+            "text": "Properties are the abstractions by which functional behavior of an\noperator can be customized. They should be non-transient objects\ninstantiated in the operator object. They need to be non-transient since\nthey are part of the operator state and re-construction of the operator\nobject from its checkpointed state must restore the operator to the\ndesired state. Properties are optional, i.e. an operator may or may not\nhave properties; they are part of user code and their values are not\ninterpreted by the platform in any way.  All non-serializable objects should be declared transient.\nExamples include sockets, session information, etc. These objects should\nbe initialized during setup call, which is called every time the\noperator is initialized.", 
+            "title": "Properties"
+        }, 
+        {
+            "location": "/application_development/#attributes_1", 
+            "text": "Attributes are values assigned to the operators that impact\nrun-time. This includes things like the number of partitions, at most\nonce or at least once or exactly once recovery modes, etc. Attributes do\nnot impact functionality of the operator. Users can change certain\nattributes in runtime. Users cannot add attributes to operators; they\nare pre-defined by the platform. They are interpreted by the platform\nand thus cannot be defined in user created code (like properties).\nDetails of attributes are covered in   Configuration Guide .", 
+            "title": "Attributes"
+        }, 
+        {
+            "location": "/application_development/#operator-state", 
+            "text": "The state of an operator is defined as the data that it transfers\nfrom one window to a future window. Since the computing model of the\nplatform is to treat windows like micro-batches, the operator state can\nbe checkpointed every Nth window, or every T units of time, where T is significantly greater\nthan the streaming window. \u00a0When an operator is checkpointed, the entire\nobject is written to HDFS. \u00a0The larger the amount of state in an\noperator, the longer it takes to recover from a failure. A stateless\noperator can recover much quicker than a stateful one. The needed\nwindows are preserved by the upstream buffer server and are used to\nrecompute the lost windows, and also rebuild the buffer server in the\ncurrent container.  The distinction between Stateless and Stateful is based solely on\nthe need to transfer data in the operator from one window to the next.\nThe state of an operator is independent of the number of ports.", 
+            "title": "Operator State"
+        }, 
+        {
+            "location": "/application_development/#stateless", 
+            "text": "A Stateless operator is defined as one where no data is needed to\nbe kept at the end of every window. This means that all the computations\nof a window can be derived from all the tuples the operator receives\nwithin that window. This guarantees that the output of any window can be\nreconstructed by simply replaying the tuples that arrived in that\nwindow. Stateless operators are more efficient in terms of fault\ntolerance, and cost to achieve SLA.", 
+            "title": "Stateless"
+        }, 
+        {
+            "location": "/application_development/#stateful", 
+            "text": "A Stateful operator is defined as one where data is needed to be\nstored at the end of a window for computations occurring in later\nwindow; a common example is the computation of a sum of values in the\ninput tuples.", 
+            "title": "Stateful"
+        }, 
+        {
+            "location": "/application_development/#operator-api", 
+            "text": "The Operator API consists of methods that operator developers may\nneed to override. In this section we will discuss the Operator APIs from\nthe point of view of an application developer. Knowledge of how an\noperator works internally is critical for writing an application. Those\ninterested in the details should refer to  Malhar Operator Developer Guide.  The APIs are available in three modes, namely Single Streaming\nWindow, Sliding Application Window, and Aggregate Application Window.\nThese are not mutually exclusive, i.e. an operator can use single\nstreaming window as well as sliding application window. A physical\ninstance of an operator is always processing tuples from a single\nwindow. The processing of tuples is guaranteed to be sequential, no\nmatter which input port the tuples arrive on.  In the later part of this section we will evaluate three common\nuses of streaming windows by applications. They have different\ncharacteristics and implications on
  optimization and recovery mechanisms\n(i.e. algorithm used to recover a node after outage) as discussed later\nin the section.", 
+            "title": "Operator API"
+        }, 
+        {
+            "location": "/application_development/#streaming-window", 
+            "text": "Streaming window is atomic micro-batch computation period. The API\nmethods relating to a streaming window are as follows  public void process( tuple_type  tuple) // Called on the input port on which the tuple arrives\npublic void beginWindow(long windowId) // Called at the start of the window as soon as the first begin_window tuple arrives\npublic void endWindow() // Called at the end of the window after end_window tuples arrive on all input ports\npublic void setup(OperatorContext context) // Called once during initialization of the operator\npublic void teardown() // Called once when the operator is being shutdown  A tuple can be emitted in any of the three streaming run-time\ncalls, namely beginWindow, process, and endWindow but not in setup or\nteardown.", 
+            "title": "Streaming Window"
+        }, 
+        {
+            "location": "/application_development/#aggregate-application-window", 
+            "text": "An operator with an aggregate window is stateful within the\napplication window timeframe and possibly stateless at the end of that\napplication window. An size of an aggregate application window is an\noperator attribute and is defined as a multiple of the streaming window\nsize. The platform recognizes this attribute and optimizes the operator.\nThe beginWindow, and endWindow calls are not invoked for those streaming\nwindows that do not align with the application window. For example in\ncase of streaming window of 0.5 second and application window of 5\nminute, an application window spans 600 streaming windows (5*60*2 =\n600). At the start of the sequence of these 600 atomic streaming\nwindows, a beginWindow gets invoked, and at the end of these 600\nstreaming windows an endWindow gets invoked. All the intermediate\nstreaming windows do not invoke beginWindow or endWindow. Bookkeeping,\nnode recovery, stats, UI, etc. continue to work off streaming windows.\nF
 or example if operators are being checkpointed say on an average every\n30th window, then the above application window would have about 20\ncheckpoints.", 
+            "title": "Aggregate Application Window"
+        }, 
+        {
+            "location": "/application_development/#sliding-application-window", 
+            "text": "A sliding window is computations that requires previous N\nstreaming windows. After each streaming window the Nth past window is\ndropped and the new window is added to the computation. An operator with\nsliding window is a stateful operator at end of any window. The sliding\nwindow period is an attribute and is a multiple of streaming window. The\nplatform recognizes this attribute and leverages it during bookkeeping.\nA sliding aggregate window with tolerance to data loss does not have a\nvery high bookkeeping cost. The cost of all three recovery mechanisms,\n at most once\u00a0(data loss tolerant),\nat least once\u00a0(data loss\nintolerant), and exactly once\u00a0(data\nloss intolerant and no extra computations) is same as recovery\nmechanisms based on streaming window. STRAM is not able to leverage this\noperator for any extra optimization.", 
+            "title": "Sliding Application Window"
+        }, 
+        {
+            "location": "/application_development/#single-vs-multi-input-operator", 
+            "text": "A single-input operator by definition has a single upstream\noperator, since there can only be one writing port for a stream. \u00a0If an\noperator has a single upstream operator, then the beginWindow on the\nupstream also blocks the beginWindow of the single-input operator. For\nan operator to start processing any window at least one upstream\noperator has to start processing that window. A multi-input operator\nreads from more than one upstream ports. Such an operator would start\nprocessing as soon as the first begin_window event arrives. However the\nwindow would not close (i.e. invoke endWindow) till all ports receive\nend_window events for that windowId. Thus the end of a window is a\nblocking event. As we saw earlier, a multi-input operator is also the\npoint in the DAG where windows of all upstream operators are\nsynchronized. The windows (atomic micro-batches) from a faster (or just\nahead in processing) upstream operators are queued up till the slower\
 nupstream operator catches up. STRAM monitors such bottlenecks and takes\ncorrective actions. The platform ensures minimal delay, i.e processing\nstarts as long as at least one upstream operator has started\nprocessing.", 
+            "title": "Single vs Multi-Input Operator"
+        }, 
+        {
+            "location": "/application_development/#recovery-mechanisms", 
+            "text": "Application developers can set any of the recovery mechanisms\nbelow to deal with node outage. In general, the cost of recovery depends\non the state of the operator, while data integrity is dependant on the\napplication. The mechanisms are per window as the platform treats\nwindows as atomic compute units. Three recovery mechanisms are\nsupported, namely   At-least-once: All atomic batches are processed at least once.\n    No data loss occurs.  At-most-once: All atomic batches are processed at most once.\n    Data loss is possible; this is the most efficient setting.  Exactly-once: All atomic batches are processed exactly once.\n    No data loss occurs; this is the least efficient setting since\n    additional work is needed to ensure proper semantics.   At-least-once is the default. During a recovery event, the\noperator connects to the upstream buffer server and asks for windows to\nbe replayed. At-least-once and exactly-once mechanisms start from its\ncheckp
 ointed state. At-most-once starts from the next begin-window\nevent.  Recovery mechanisms can be specified per Operator while writing\nthe application as shown below.  Operator o = dag.addOperator(\u201coperator\u201d, \u2026);\ndag.setAttribute(o,  OperatorContext.PROCESSING_MODE,  ProcessingMode.AT_MOST_ONCE);  Also note that once an operator is attributed to AT_MOST_ONCE,\nall the operators downstream to it have to be AT_MOST_ONCE. The client\nwill give appropriate warnings or errors if that\u2019s not the case.  Details are explained in the chapter on Fault Tolerance below.", 
+            "title": "Recovery Mechanisms"
+        }, 
+        {
             "location": "/application_development/#streams", 
             "text": "A stream\u00a0is a connector\n(edge) abstraction, and is a fundamental building block of the platform.\nA stream consists of tuples that flow from one port (called the\noutput\u00a0port) to one or more ports\non other operators (called  input\u00a0ports) another -- so note a potentially\nconfusing aspect of this terminology: tuples enter a stream through its\noutput port and leave via one or more input ports. A stream has the\nfollowing characteristics   Tuples are always delivered in the same order in which they\n    were emitted.  Consists of a sequence of windows one after another. Each\n    window being a collection of in-order tuples.  A stream that connects two containers passes through a\n    buffer server.  All streams can be persisted (by default in HDFS).  Exactly one output port writes to the stream.  Can be read by one or more input ports.  Connects operators within an application, not outside\n    an application.  Has an unique name within an applic
 ation.  Has attributes which act as hints to STRAM.   Streams have four modes, namely in-line, in-node, in-rack,\n    and other. Modes may be overruled (for example due to lack\n    of containers). They are defined as follows:   THREAD_LOCAL: In the same thread, uses thread\n    stack (intra-thread). This mode can only be used for a downstream\n    operator which has only one input port connected; also called\n    in-line.  CONTAINER_LOCAL: In the same container (intra-process); also\n    called in-container.  NODE_LOCAL: In the same Hadoop node (inter processes, skips\n    NIC); also called in-node.  RACK_LOCAL: On nodes in the same rack; also called\n    in-rack.  unspecified: No guarantee. Could be anywhere within the\n    cluster     An example of a stream declaration is given below  DAG dag = new DAG();\n \u2026\ndag.addStream( views , viewAggregate.sum, cost.data).setLocality(CONTAINER_LOCAL); // A container local  stream\ndag.addStream(\u201cclicks\u201d, clickAggregate.sum, 
 rev.data); // An example of unspecified locality  The platform guarantees in-order delivery of tuples in a stream.\nSTRAM views each stream as collection of ordered windows. Since no tuple\ncan exist outside a window, a replay of a stream consists of replay of a\nset of windows. When multiple input ports read the same stream, the\nexecution plan of a stream ensures that each input port is logically not\nblocked by the reading of another input port. The schema of a stream is\nsame as the schema of the tuple.  In a stream all tuples emitted by an operator in a window belong\nto that window. A replay of this window would consists of an in-order\nreplay of all the tuples. Thus the tuple order within a stream is\nguaranteed. However since an operator may receive multiple streams (for\nexample an operator with two input ports), the order of arrival of two\ntuples belonging to different streams is not guaranteed. In general in\nan asynchronous distributed architecture this is expected. Thu
 s the\noperator (specially one with multiple input ports) should not depend on\nthe tuple order from two streams. One way to cope with this\nindeterminate order, if necessary, is to wait to get all the tuples of a\nwindow and emit results in endWindow call. All operator templates\nprovided as part of Malhar operator library follow these principles.  A logical stream gets partitioned into physical streams each\nconnecting the partition to the upstream operator. If two different\nattributes are needed on the same stream, it should be split using\nStreamDuplicator\u00a0operator.  Modes of the streams are critical for performance. An in-line\nstream is the most optimal as it simply delivers the tuple as-is without\nserialization-deserialization. Streams should be marked\ncontainer_local, specially in case where there is a large tuple volume\nbetween two operators which then on drops significantly. Since the\nsetLocality call merely provides a hint, STRAM may ignore it. An In-node\nstrea
 m is not as efficient as an in-line one, but it is clearly better\nthan going off-node since it still avoids the potential bottleneck of\nthe network card.  THREAD_LOCAL and CONTAINER_LOCAL streams do not use a buffer\nserver as this stream is in a single process. The other two do.", 
             "title": "Streams"
         }, 
         {
             "location": "/application_development/#validating-an-application", 
-            "text": "The platform provides various ways of validating the application\nspecification and data input. An understanding of these checks is very\nimportant for an application developer since it affects productivity.\nValidation of an application is done in three phases, namely   Compile Time: Caught during application development, and is\n    most cost effective. These checks are mainly done on declarative\n    objects and leverages the Java compiler. An example is checking that\n    the schemas specified on all ports of a stream are\n    mutually compatible.  Initialization Time: When the application is being\n    initialized, before submitting to Hadoop. These checks are related\n    to configuration/context of an application, and are done by the\n    logical DAG builder implementation. An example is the checking that\n    all non-optional ports are connected to other ports.  Run Time: Validations done when the application is running.\n    This is the costliest of all
  checks. These are checks that can only\n    be done at runtime as they involve data. For example divide by 0\n    check as part of business logic.   Compile Time  Compile time validations apply when an application is specified in\nJava code and include all checks that can be done by Java compiler in\nthe development environment (including IDEs like NetBeans or Eclipse).\nExamples include   Schema Validation: The tuples on ports are POJO (plain old\n    java objects) and compiler checks to ensure that all the ports on a\n    stream have the same schema.  Stream Check: Single Output Port and at least one Input port\n    per stream. A stream can only have one output port writer. This is\n    part of the addStream api. This\n    check ensures that developers only connect one output port to\n    a stream. The same signature also ensures that there is at least one\n    input port for a stream  Naming: Compile time checks ensures that applications\n    components operators, streams are na
 med   Initialization/Instantiation Time  Initialization time validations include various checks that are\ndone post compile, and before the application starts running in a\ncluster (or local mode). These are mainly configuration/contextual in\nnature. These checks are as critical to proper functionality of the\napplication as the compile time validations.  Examples include    JavaBeans Validation :\n    Examples include   @Max(): Value must be less than or equal to the number  @Min(): Value must be greater than or equal to the\n    number  @NotNull: The value of the field or property must not be\n    null  @Pattern(regexp = \u201c....\u201d): Value must match the regular\n    expression  Input port connectivity: By default, every non-optional input\n    port must be connected. A port can be declared optional by using an\n    annotation: \u00a0 \u00a0 @InputPortFieldAnnotation(name = \"...\", optional\n    = true)  Output Port Connectivity: Similar. The annotation here is: \u00a0 \u0
 0a0\n    @OutputPortFieldAnnotation(name = \"...\", optional = true)     Unique names in application scope: Operators, streams, must have\n    unique names.   Cycles in the dag: DAG cannot have a cycle.  Unique names in operator scope: Ports, properties, annotations\n    must have unique names.  One stream per port: A port can connect to only one stream.\n    This check applies to input as well as output ports even though an\n    output port can technically write to two streams. If you must have\n    two streams originating from a single output port, use \u00a0a\u00a0streamDuplicator operator.  Application Window Period: Has to be an integral multiple the\n    streaming window period.   Run Time  Run time checks are those that are done when the application is\nrunning. The real-time streaming platform provides rich run time error\nhandling mechanisms. The checks are exclusively done by the application\nbusiness logic, but the platform allows applications to count and audit\nthese. S
 ome of these features are in the process of development (backend\nand UI) and this section will be updated as they are developed. Upon\ncompletion examples will be added to demos to illustrate these.  Error ports are output ports with error annotations. Since they\nare normal ports, they can be monitored and tuples counted, persisted\nand counts shown in the UI.", 
+            "text": "The platform provides various ways of validating the application\nspecification and data input. An understanding of these checks is very\nimportant for an application developer since it affects productivity.\nValidation of an application is done in three phases, namely   Compile Time: Caught during application development, and is\n    most cost effective. These checks are mainly done on declarative\n    objects and leverages the Java compiler. An example is checking that\n    the schemas specified on all ports of a stream are\n    mutually compatible.  Initialization Time: When the application is being\n    initialized, before submitting to Hadoop. These checks are related\n    to configuration/context of an application, and are done by the\n    logical DAG builder implementation. An example is the checking that\n    all non-optional ports are connected to other ports.  Run Time: Validations done when the application is running.\n    This is the costliest of all
  checks. These are checks that can only\n    be done at runtime as they involve data. For example divide by 0\n    check as part of business logic.", 
             "title": "Validating an Application"
         }, 
         {
+            "location": "/application_development/#compile-time", 
+            "text": "Compile time validations apply when an application is specified in\nJava code and include all checks that can be done by Java compiler in\nthe development environment (including IDEs like NetBeans or Eclipse).\nExamples include   Schema Validation: The tuples on ports are POJO (plain old\n    java objects) and compiler checks to ensure that all the ports on a\n    stream have the same schema.  Stream Check: Single Output Port and at least one Input port\n    per stream. A stream can only have one output port writer. This is\n    part of the addStream api. This\n    check ensures that developers only connect one output port to\n    a stream. The same signature also ensures that there is at least one\n    input port for a stream  Naming: Compile time checks ensures that applications\n    components operators, streams are named", 
+            "title": "Compile Time"
+        }, 
+        {
+            "location": "/application_development/#initializationinstantiation-time", 
+            "text": "Initialization time validations include various checks that are\ndone post compile, and before the application starts running in a\ncluster (or local mode). These are mainly configuration/contextual in\nnature. These checks are as critical to proper functionality of the\napplication as the compile time validations.  Examples include    JavaBeans Validation :\n    Examples include   @Max(): Value must be less than or equal to the number  @Min(): Value must be greater than or equal to the\n    number  @NotNull: The value of the field or property must not be\n    null  @Pattern(regexp = \u201c....\u201d): Value must match the regular\n    expression  Input port connectivity: By default, every non-optional input\n    port must be connected. A port can be declared optional by using an\n    annotation: \u00a0 \u00a0 @InputPortFieldAnnotation(name = \"...\", optional\n    = true)  Output Port Connectivity: Similar. The annotation here is: \u00a0 \u00a0\n    @OutputPort
 FieldAnnotation(name = \"...\", optional = true)     Unique names in application scope: Operators, streams, must have\n    unique names.   Cycles in the dag: DAG cannot have a cycle.  Unique names in operator scope: Ports, properties, annotations\n    must have unique names.  One stream per port: A port can connect to only one stream.\n    This check applies to input as well as output ports even though an\n    output port can technically write to two streams. If you must have\n    two streams originating from a single output port, use \u00a0a\u00a0streamDuplicator operator.  Application Window Period: Has to be an integral multiple the\n    streaming window period.", 
+            "title": "Initialization/Instantiation Time"
+        }, 
+        {
+            "location": "/application_development/#run-time", 
+            "text": "Run time checks are those that are done when the application is\nrunning. The real-time streaming platform provides rich run time error\nhandling mechanisms. The checks are exclusively done by the application\nbusiness logic, but the platform allows applications to count and audit\nthese. Some of these features are in the process of development (backend\nand UI) and this section will be updated as they are developed. Upon\ncompletion examples will be added to demos to illustrate these.  Error ports are output ports with error annotations. Since they\nare normal ports, they can be monitored and tuples counted, persisted\nand counts shown in the UI.", 
+            "title": "Run Time"
+        }, 
+        {
             "location": "/application_development/#multi-tenancy-and-security", 
             "text": "Hadoop is a multi-tenant distributed operating system. Security is\nan intrinsic element of multi-tenancy as without it a cluster cannot be\nreasonably be shared among enterprise applications. Streaming\napplications follow all multi-tenancy security models used in Hadoop as\nthey are native Hadoop applications.", 
             "title": "Multi-Tenancy and Security"
@@ -147,10 +307,40 @@
         }, 
         {
             "location": "/application_development/#partitioning", 
-            "text": "If all tuples sent through the stream(s) that are connected to the\ninput port(s) of an operator in the DAG are received by a single\nphysical instance of that operator, that operator can become a\nperformance bottleneck. This leads to scalability issues when\nthroughput, memory, or CPU needs exceed the processing capacity of that\nsingle instance.  To address the problem, the platform offers the capability to\npartition the inflow of data so that it is divided across multiple\nphysical instances of a logical operator in the DAG. There are two\nfunctional ways to partition   Load balance: Incoming load is simply partitioned\n    into stream(s) that go to separate instances of physical operators\n    and scalability is achieved via adding more physical operators. Each\n    tuple is sent to physical operator (partition) based on a\n    round-robin or other similar algorithm. This scheme scales linearly.\n    A lot of key based computations can load balance in the 
 platform due\n    to the ability to insert  Unifiers. For many computations, the\n    endWindow and Unifier setup is similar to the combiner and reducer\n    mechanism in a Map-Reduce computation.  Sticky Key: The key assertion is that distribution of tuples\n    are sticky, i.e the data with\n    same key will always be processed by the same physical operator, no\n    matter how many times it is sent through the stream. This stickiness\n    will continue even if the number of partitions grows dynamically and\n    can eventually be leveraged for advanced features like\n    bucket testing. How this is accomplished and what is required to\n    develop compliant operators will be explained below.   We plan to add more partitioning mechanisms proactively to the\nplatform over time as needed by emerging usage patterns. The aim is to\nallow enterprises to be able to focus on their business logic, and\nsignificantly reduce the cost of operability. As an enabling technology\nfor managing hi
 gh loads, this platform provides enterprises with a\nsignificant innovative edge. Scalability and Partitioning is a\nfoundational building block for this platform.  Sticky Partition vs Round Robin  As noted above, partitioning via sticky key is data aware but\nround-robin partitioning is not. An example for non-sticky load\nbalancing would be round robin distribution over multiple instances,\nwhere for example a tuple stream of  A, A,\nA with 3 physical operator\ninstances would result in processing of a single A by each of the instances, In contrast, sticky\npartitioning means that exactly one instance of the operators will\nprocess all of the  Atuples if they\nfall into the same bucket, while B\nmay be processed by another operator. Data aware mapping of\ntuples to partitions (similar to distributed hash table) is accomplished\nvia Stream Codecs. In later sections we would show how these two\napproaches can be used in combination.  Stream Codec  The platform does not make assumpti
 ons about the tuple\ntype, it could be any Java object. The operator developer knows what\ntuple type an input port expects and is capable of processing. Each\ninput port has a stream codec \u00a0associated thatdefines how data is serialized when transmitted over a socket\nstream; it also defines another\nfunction that computes the partition hash key for the tuple. The engine\nuses that key to determine which physical instance(s) \u00a0(for a\npartitioned operator) receive that \u00a0tuple. For this to work, consistent hashing is required.\nThe default codec uses the Java Object#hashCode function, which is\nsufficient for basic types such as Integer, String etc. It will also\nwork with custom tuple classes as long as they implement hashCode\nappropriately. Reliance on hashCode may not work when generic containers\nare used that do not hash the actual data, such as standard collection\nclasses (HashMap etc.), in which case a custom stream codec must be\nassigned to the input port.  S
 tatic Partitioning  DAG designers can specify at design time how they would like\ncertain operators to be partitioned. STRAM then instantiates the DAG\nwith the physical plan which adheres to the partitioning scheme defined\nby the design. This plan is the initial partition of the application. In\nother words, Static Partitioning is used to tell STRAM to compute the\nphysical DAG from a logical DAG once, without taking into consideration\nruntime states or loads of various operators.  Dynamic Partitioning  In streaming applications the load changes during the day, thus\ncreating situations where the number of partitioned operator instances\nneeds to adjust dynamically. The load can be measured in terms of\nprocessing within the DAG based on throughput, or latency, or\nconsiderations in external system components (time based etc.) that the\nplatform may not be aware of. Whatever the trigger, the resource\nrequirement for the current processing needs to be adjusted at run-time.\nThe p
 latform may detect that operator instances are over or under\nutilized and may need to dynamically adjust the number of instances on\nthe fly. More instances of a logical operator may be required (partition\nsplit) or underutilized operator instances may need decommissioning\n(partition merge). We refer to either of the changes as dynamic\npartitioning. The default partitioning scheme supports split and merge\nof partitions, but without state transfer. The contract of the\nPartitioner\u00a0interface allows the operator\ndeveloper to implement split/merge and the associated state transfer, if\nnecessary.  Since partitioning is a key scalability measure, our goal is to\nmake it as simple as possible without removing the flexibility needed\nfor sophisticated applications. Basic partitioning can be enabled at\ncompile time through the DAG specification. A slightly involved\npartitioning involves writing custom codecs to calculate data aware\npartitioning scheme. More complex partitionin
 g cases may require users\nto provide a custom implementation of Partitioner, which gives the\ndeveloper full control over state transfer between multiple instances of\nthe partitioned operator.  Default Partitioning  The platform provides a default partitioning implementation that\ncan be enabled without implementing Partitioner\u00a0(or writing any other extra Java\ncode), which is designed to support simple sticky partitioning out of\nthe box for operators with logic agnostic to the partitioning scheme\nthat can be enabled by means of DAG construction alone.  Typically an operator that can work with the default partitioning\nscheme would have a single input port. If there are multiple input\nports, only one port will be partitioned (the port first connected in\nthe DAG). T

<TRUNCATED>