You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/28 06:32:49 UTC

[07/16] apex-site git commit: Adding malhar-3.7.0 documentation

http://git-wip-us.apache.org/repos/asf/apex-site/blob/b25c090d/docs/malhar-3.7/mkdocs/search_index.json
----------------------------------------------------------------------
diff --git a/docs/malhar-3.7/mkdocs/search_index.json b/docs/malhar-3.7/mkdocs/search_index.json
new file mode 100644
index 0000000..5ded716
--- /dev/null
+++ b/docs/malhar-3.7/mkdocs/search_index.json
@@ -0,0 +1,2134 @@
+{
+    "docs": [
+        {
+            "location": "/", 
+            "text": "Apache Apex Malhar\n\n\nApache Apex Malhar is an open source operator and codec library that can be used with the \nApache Apex\n platform to build real-time streaming applications.  Enabling users to extract value quickly, Malhar operators help get data in, analyze it in real-time, and get data out of Hadoop.  In addition to the operators, the library contains a number of example applications, demonstrating operator features and capabilities.\n\n\n\n\nCapabilities common across Malhar operators\n\n\nFor most streaming platforms, connectors are afterthoughts and often end up being simple \u2018bolt-ons\u2019 to the platform. As a result they often cause performance issues or data loss when put through failure scenarios and scalability requirements. Malhar operators do not face these issues as they were designed to be integral parts of Apex. Hence, they have following core streaming runtime capabilities\n\n\n\n\nFault tolerance\n \u2013 Malhar operators where app
 licable have fault tolerance built in. They use the checkpoint capability provided by the framework to ensure that there is no data loss under ANY failure scenario.\n\n\nProcessing guarantees\n \u2013 Malhar operators where applicable provide out of the box support for ALL three processing guarantees \u2013 exactly once, at-least once, and at-most once WITHOUT requiring the user to write any additional code.  Some operators, like MQTT operator, deal with source systems that can not track processed data and hence need the operators to keep track of the data.  Malhar has support for a generic operator that uses alternate storage like HDFS to facilitate this.  Finally for databases that support transactions or support any sort of atomic batch operations Malhar operators can do exactly once down to the tuple level.\n\n\nDynamic updates\n \u2013 Based on changing business conditions you often have to tweak several parameters used by the operators in your streaming application without inc
 urring any application downtime. You can also change properties of a Malhar operator at runtime without having to bring down the application.\n\n\nEase of extensibility\n \u2013 Malhar operators are based on templates that are easy to extend.\n\n\nPartitioning support\n \u2013 In streaming applications the input data stream often needs to be partitioned based on the contents of the stream. Also for operators that ingest data from external systems partitioning needs to be done based on the capabilities of the external system.  For example with Kafka, the operator can automatically scale up or down based on the changes in the number of Kafka partitions.\n\n\n\n\nOperator Library Overview\n\n\nInput/output connectors\n\n\nBelow is a summary of the various sub categories of input and output operators. Input operators also have a corresponding output operator\n\n\n\n\nFile Systems\n \u2013 Most streaming analytics use cases require the data to be stored in HDFS or perhaps S3 if the appli
 cation is running in AWS.  Users often need to re-run their streaming analytical applications against historical data or consume data from upstream processes that are perhaps writing to some NFS share.  Apex supports input \n output operators for HDFS, S3, NFS \n Local Files.  There are also File Splitter and Block Reader operators, which can accelecate processing of large files by splitting and paralellizing the work across non-overlapping sets of file blocks.\n\n\nRelational Databases\n \u2013 Most stream processing use cases require some reference data lookups to enrich, tag or filter streaming data. There is also a need to save results of the streaming analytical computation to a database so an operational dashboard can see them. Apex supports a JDBC operator so you can read/write data from any JDBC compliant RDBMS like Oracle, MySQL, Sqlite, etc.\n\n\nNoSQL Databases\n \u2013 NoSQL key-value pair databases like Cassandra \n HBase are a common part of streaming analytics applica
 tion architectures to lookup reference data or store results.  Malhar has operators for HBase, Cassandra, Accumulo, Aerospike, MongoDB, and CouchDB.\n\n\nMessaging Systems\n \u2013 Kafka, JMS, and similar systems are the workhorses of messaging infrastructure in most enterprises.  Malhar has a robust, industry-tested set of operators to read and write Kafka, JMS, ZeroMQ, and RabbitMQ messages.\n\n\nNotification Systems\n \u2013 Malhar includes an operator for sending notifications via SMTP.\n\n\nIn-memory Databases \n Caching platforms\n - Some streaming use cases need instantaneous access to shared state across the application. Caching platforms and in-memory databases serve this purpose really well. To support these use cases, Malhar has operators for memcached and Redis.\n\n\nSocial Media\n - Malhar includes an operator to connect to the popular Twitter stream fire hose.\n\n\nProtocols\n - Malhar provides connectors that can communicate in HTTP, RSS, Socket, WebSocket, FTP, and M
 QTT.\n\n\n\n\nParsers\n\n\nThere are many industry vertical specific data formats that a streaming application developer might need to parse. Often there are existing parsers available for these that can be directly plugged into an Apache Apex application. For example in the Telco space, a Java based CDR parser can be directly plugged into Apache Apex operator. To further simplify development experience, Malhar also provides some operators for parsing common formats like XML (DOM \n SAX), JSON (flat map converter), Apache log files, syslog, etc.\n\n\nStream manipulation\n\n\nStreaming data inevitably needs processing to clean, filter, tag, summarize, etc. The goal of Malhar is to enable the application developer to focus on WHAT needs to be done to the stream to get it in the right format and not worry about the HOW.  Malhar has several operators to perform the common stream manipulation actions like \u2013 GroupBy, Join, Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, O
 uter join, Select, Update etc.\n\n\nCompute\n\n\nOne of the most important promises of a streaming analytics platform like Apache Apex is the ability to do analytics in real-time. However delivering on the promise becomes really difficult when the platform does not provide out of the box operators to support variety of common compute functions as the user then has to worry about making these scalable, fault tolerant, stateful, etc.  Malhar takes this responsibility away from the application developer by providing a variety of out of the box computational operators.\n\n\nBelow is just a snapshot of the compute operators available in Malhar\n\n\n\n\nStatistics and math - Various mathematical and statistical computations over application defined time windows.\n\n\nFiltering and pattern matching\n\n\nSorting, maps, frequency, TopN, BottomN\n\n\nRandom data generators\n\n\n\n\nLanguages Support\n\n\nMigrating to a new platform often requires re-use of the existing code that would be diff
 icult or time-consuming to re-write.  With this in mind, Malhar supports invocation of code written in other languages by wrapping them in one of the library operators, and allows execution of software written in:\n\n\n\n\nJavaScript\n\n\nPython\n\n\nR\n\n\nRuby", 
+            "title": "Apache Apex Malhar"
+        }, 
+        {
+            "location": "/#apache-apex-malhar", 
+            "text": "Apache Apex Malhar is an open source operator and codec library that can be used with the  Apache Apex  platform to build real-time streaming applications.  Enabling users to extract value quickly, Malhar operators help get data in, analyze it in real-time, and get data out of Hadoop.  In addition to the operators, the library contains a number of example applications, demonstrating operator features and capabilities.", 
+            "title": "Apache Apex Malhar"
+        }, 
+        {
+            "location": "/#capabilities-common-across-malhar-operators", 
+            "text": "For most streaming platforms, connectors are afterthoughts and often end up being simple \u2018bolt-ons\u2019 to the platform. As a result they often cause performance issues or data loss when put through failure scenarios and scalability requirements. Malhar operators do not face these issues as they were designed to be integral parts of Apex. Hence, they have following core streaming runtime capabilities   Fault tolerance  \u2013 Malhar operators where applicable have fault tolerance built in. They use the checkpoint capability provided by the framework to ensure that there is no data loss under ANY failure scenario.  Processing guarantees  \u2013 Malhar operators where applicable provide out of the box support for ALL three processing guarantees \u2013 exactly once, at-least once, and at-most once WITHOUT requiring the user to write any additional code.  Some operators, like MQTT operator, deal with source systems that can not track processed data and hence n
 eed the operators to keep track of the data.  Malhar has support for a generic operator that uses alternate storage like HDFS to facilitate this.  Finally for databases that support transactions or support any sort of atomic batch operations Malhar operators can do exactly once down to the tuple level.  Dynamic updates  \u2013 Based on changing business conditions you often have to tweak several parameters used by the operators in your streaming application without incurring any application downtime. You can also change properties of a Malhar operator at runtime without having to bring down the application.  Ease of extensibility  \u2013 Malhar operators are based on templates that are easy to extend.  Partitioning support  \u2013 In streaming applications the input data stream often needs to be partitioned based on the contents of the stream. Also for operators that ingest data from external systems partitioning needs to be done based on the capabilities of the external system.  Fo
 r example with Kafka, the operator can automatically scale up or down based on the changes in the number of Kafka partitions.", 
+            "title": "Capabilities common across Malhar operators"
+        }, 
+        {
+            "location": "/#operator-library-overview", 
+            "text": "", 
+            "title": "Operator Library Overview"
+        }, 
+        {
+            "location": "/#inputoutput-connectors", 
+            "text": "Below is a summary of the various sub categories of input and output operators. Input operators also have a corresponding output operator   File Systems  \u2013 Most streaming analytics use cases require the data to be stored in HDFS or perhaps S3 if the application is running in AWS.  Users often need to re-run their streaming analytical applications against historical data or consume data from upstream processes that are perhaps writing to some NFS share.  Apex supports input   output operators for HDFS, S3, NFS   Local Files.  There are also File Splitter and Block Reader operators, which can accelecate processing of large files by splitting and paralellizing the work across non-overlapping sets of file blocks.  Relational Databases  \u2013 Most stream processing use cases require some reference data lookups to enrich, tag or filter streaming data. There is also a need to save results of the streaming analytical computation to a database so an operational das
 hboard can see them. Apex supports a JDBC operator so you can read/write data from any JDBC compliant RDBMS like Oracle, MySQL, Sqlite, etc.  NoSQL Databases  \u2013 NoSQL key-value pair databases like Cassandra   HBase are a common part of streaming analytics application architectures to lookup reference data or store results.  Malhar has operators for HBase, Cassandra, Accumulo, Aerospike, MongoDB, and CouchDB.  Messaging Systems  \u2013 Kafka, JMS, and similar systems are the workhorses of messaging infrastructure in most enterprises.  Malhar has a robust, industry-tested set of operators to read and write Kafka, JMS, ZeroMQ, and RabbitMQ messages.  Notification Systems  \u2013 Malhar includes an operator for sending notifications via SMTP.  In-memory Databases   Caching platforms  - Some streaming use cases need instantaneous access to shared state across the application. Caching platforms and in-memory databases serve this purpose really well. To support these use cases, Malhar
  has operators for memcached and Redis.  Social Media  - Malhar includes an operator to connect to the popular Twitter stream fire hose.  Protocols  - Malhar provides connectors that can communicate in HTTP, RSS, Socket, WebSocket, FTP, and MQTT.", 
+            "title": "Input/output connectors"
+        }, 
+        {
+            "location": "/#parsers", 
+            "text": "There are many industry vertical specific data formats that a streaming application developer might need to parse. Often there are existing parsers available for these that can be directly plugged into an Apache Apex application. For example in the Telco space, a Java based CDR parser can be directly plugged into Apache Apex operator. To further simplify development experience, Malhar also provides some operators for parsing common formats like XML (DOM   SAX), JSON (flat map converter), Apache log files, syslog, etc.", 
+            "title": "Parsers"
+        }, 
+        {
+            "location": "/#stream-manipulation", 
+            "text": "Streaming data inevitably needs processing to clean, filter, tag, summarize, etc. The goal of Malhar is to enable the application developer to focus on WHAT needs to be done to the stream to get it in the right format and not worry about the HOW.  Malhar has several operators to perform the common stream manipulation actions like \u2013 GroupBy, Join, Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, Outer join, Select, Update etc.", 
+            "title": "Stream manipulation"
+        }, 
+        {
+            "location": "/#compute", 
+            "text": "One of the most important promises of a streaming analytics platform like Apache Apex is the ability to do analytics in real-time. However delivering on the promise becomes really difficult when the platform does not provide out of the box operators to support variety of common compute functions as the user then has to worry about making these scalable, fault tolerant, stateful, etc.  Malhar takes this responsibility away from the application developer by providing a variety of out of the box computational operators.  Below is just a snapshot of the compute operators available in Malhar   Statistics and math - Various mathematical and statistical computations over application defined time windows.  Filtering and pattern matching  Sorting, maps, frequency, TopN, BottomN  Random data generators", 
+            "title": "Compute"
+        }, 
+        {
+            "location": "/#languages-support", 
+            "text": "Migrating to a new platform often requires re-use of the existing code that would be difficult or time-consuming to re-write.  With this in mind, Malhar supports invocation of code written in other languages by wrapping them in one of the library operators, and allows execution of software written in:   JavaScript  Python  R  Ruby", 
+            "title": "Languages Support"
+        }, 
+        {
+            "location": "/apis/calcite/", 
+            "text": "Apache Apex is a unified stream and batch processing engine that enables application developers to process data at very high throughput with low latency. Although the different types of data have different processing needs, SQL remains a popular and a generic way for processing data. To ensure that existing ETL developers and developers who are well versed with Database applications adopt stream processing application development with ease, integration of SQL with Apex was needed. Being a popular Apache project, Apache Calcite was chosen for this purpose and its integration with Apex is described below.\n\n\nApex-Calcite Integration\n\n\nApache Calcite is a highly customizable engine for parsing and planning queries on relational data from various data sources; it provides storage independent optimization of queries and ways to integrate them into other frameworks which would like to take advantage and expose SQL capability to their users. For details, please re
 ad at \nApache Calcite Website\n. \n\n\nParticularly in SQL on Apex, Calcite processes a query and then creates relational algebra to create processing pipelines. These relational algebra processing pipelines are converted to a DAG with a set of operators to perform business logic on streaming data.\n\n\n\n\nAbove figure explains how SQL query gets converted to Apex DAG.\n\n\n\n\nUser specified query is processed by Calcite Query planner; this involves parsing and optimizing the query to generate Relation Expression Tree. \n\n\nThis Relation Expression Tree is received by Apache Apex\u2019s SQL module to finally convert to an Apex DAG having series of operators.\n\n\n\n\nOne peculiarity of Calcite queries is that the data source and destination need not be RDBMS systems; in the above example, \nFile\n refers to a file in the filesystem and \nKafka\n to a Kafka message broker. Calcite allows Apex to register table sources and destinations as anything which can return a row type resul
 ts. So a \u201cscan\u201d relational expression gets converted to \u201cKafkaInputOperator + ParseOperator\u201d, a result of which is series of POJOs reflecting a Row Type. Similarly, the \u201cinsert\u201d Relational Expression translated to \u201cFormatOperator + FileOutputOperator\u201d.\n\n\nFor more details about the integration, click \nhere\n.\n\n\nSQL APIs for Apache Apex\n\n\nListed below are the Java APIs which can be used by SQL/Apex users to create a DAG in the implementation of the \npopulateDAG\n method of the \nStreamingApplication\n interface.\n\n\n\n\n\n\n\n\nAPI\n\n\nDescription\n\n\n\n\n\n\n\n\n\n\nSQLExecEnvironment.getEnvironment()\n\n\nCreates a new SQL execution environment\n\n\n\n\n\n\nSQLExecEnvironment.registerTable(tableName, endpointInstance)\n\n\nRegisters a new abstract table with existing environment. \nendpointInstance\n is an object of type \nEndpoint\n which defines a table.\n\n\n\n\n\n\nSQLExecEnvironment.registerFunction(sqlFunctionName, holderCl
 ass, staticFunctionName)\n\n\nRegisters a new User Defined Scalar function\n\n\n\n\n\n\nSQLExecEnvironment.executeSQL(dag, sqlStatement)\n\n\nCreates a DAG for a particular SQL statement\n\n\n\n\n\n\n\n\nUsage of above APIs is described in detail in following sections.\n\n\nExample 1: Pure Style SQL Application\n\n\nWith Apache Calcite Integration, you can use SQL queries across different data sources and provide UDFs (User Defined Functions) as per your business logic. This example will use a Kafka topic as the source and a HDFS file as the destination.\nFollowing application code will be used to explain APIs. Actual source code can be found \nhere\n.\n\n\n  public class PureStyleSQLApplication implements StreamingApplication\n  {\n    @Override\n    public void populateDAG(DAG dag, Configuration conf)\n    {\n       // Create new SQLExecEnvironment\n       SQLExecEnvironment sqlEnv = SQLExecEnvironment.getEnvironment();\n\n      // This is a string that defines a schema and is dis
 cussed in more detail in \nRegistering tables with SQLExecEnvironment\n section \n      String inputSchemaString = \n...\n;\n\n      // similar to inputSchemaString, we also need to define outputSchemaString\n      String outputSchemaString = \n...\n;\n\n       // Register KafkaEnpoint as \nORDERS\n table with kafka topic and data format as CSV\n       sqlEnv = sqlEnv.registerTable( \n                                    \nORDERS\n, \n                                    new KafkaEndpoint(\nlocalhost:9090\n, \n                                                      \ninputTopic\n, \n                                                      new CSVMessageFormat(inputSchemaString))\n                                  );\n\n       // Register FileEndpoint as \nSALES\n table with file path and data format as CSV\n       sqlEnv = sqlEnv.registerTable( \n                                    \nSALES\n, \n                                    new FileEndpoint(\n/tmp/output\n, \n                        
                              \nout.file\n, \n                                                     new CSVMessageFormat(outputSchemaString))\n                                  );\n\n       // Register scalar SQL UDF \n       sqlEnv = sqlEnv.registerFunction(\nAPEXCONCAT\n, PureStyleSQLApplication.class, \napex_concat_str\n);\n\n       // Converting SQL statement to DAG \n       String sql = \nINSERT INTO SALES \n                       SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n                       FROM ORDERS \n                       WHERE ID \n 3 AND PRODUCT LIKE 'paint%'\n;\n       sqlEnv.executeSQL(dag, sql);\n    }// populateDAG finished\n\n    public static String apex_concat_str(String s1, String s2)\n    {\n        return s1 + s2;\n    } \n  }\n\n\n\n\nConstructing SQLExecEnvironment\n\n\nThe class \nSQLExecEnvironment\n provides a starting point and a simple way to define metadata needed for running a SQL statement; a ne
 w instance of this class is returned by the \ngetEnvironment\n static method.  \n\n\n  // Creates SQLExecEnvironment instance by using static method getEnvironment\n  SQLExecEnvironment sqlEnv = SQLExecEnvironment.getEnvironment();\n\n\n\n\nRegistering tables with SQLExecEnvironment\n\n\nNext, we need to register tables which can be used in a query. For this purpose, we can use \nregisterTable\n method from SQLExecEnvironment.\n\n\n  // Register KafkaEnpoint as \nORDERS\n table with kafka topic and data format as CSV\n  sqlEnv = sqlEnv.registerTable( \n                              \nORDERS\n, \n                              new KafkaEndpoint(\nlocalhost:9090\n, \n                                                \ninputTopic\n, \n                                                new CSVMessageFormat(inputSchemaString))\n                            );\n\n  // Register FileEndpoint as \nSALES\n table with file path and data format as CSV\n  sqlEnv = sqlEnv.registerTable( \n              
                 \nSALES\n, \n                              new FileEndpoint(\n/tmp/output\n, \n                                               \nout.file\n, \n                                               new CSVMessageFormat(inputSchemaString))\n                            );\n\n\n\n\n\"registerTable\"\n method takes the name of the table and an instance of endpoint as parameters. Endpoint signifies data storage mechanism and type of source/destination for the data. These endpoints require different types of configurations and possibly data formats. The data format is defined using an implementation of the \nMessageFormat\n interface; the \nCSVMessageFormat\n implementation can be configured with a schema string as follows:\n\n\n{\n  \nseparator\n: \n,\n,\n  \nquoteChar\n: \n\\\n,\n  \nfields\n: [\n    {\n      \nname\n: \nRowTime\n,\n      \ntype\n: \nDate\n,\n      \nconstraints\n: {\n        \nformat\n: \ndd/MM/yyyy hh:mm:ss Z\n\n      }\n    },\n    {\n      \nname\n: \nid\n,\n
       \ntype\n: \nInteger\n\n    },\n    {\n      \nname\n: \nProduct\n,\n      \ntype\n: \nString\n\n    },\n    {\n      \nname\n: \nunits\n,\n      \ntype\n: \nInteger\n\n    }\n  ]\n}\n\n\n\n\nThe schema string is a JSON string defining a separator character, quote character for fields with String type and a list of fields where, for each field, its name, type and any additional constraints are specified.\n\n\nFollowing data endpoints are supported: \n\n\n\n\nKafkaEnpoint\n\n: To define a Kafka Endpoint we need to specify the Kafka broker (as host:port), topic name and MessageFormat as seen in line 1 in the code above.\n\n\nFileEndpoint\n\n: It needs to be configured with the filesystem path, file name and MessageFormat as in line 2 in the code above. \n\n\nStreamEndpoint\n \n: This allows us to connect existing operator output or input ports to the SQL query as a data source or sink respectively. StreamEndpoint needs immediate downstream operator's input port or immediate upstr
 eam operator's output port and the field mapping for CSV data or POJO class. This will be explained in detail in next \nexample\n.\n\n\n\n\nUsing User Defined Functions (UDF) in a SQL query\n\n\nWe can use our own scalar UDF, implemented in Java, in a SQL statement for data manipulation but first, we need to register the function with the execution environment by using the \nregisterFunction\n method.\n\n\n  sqlEnv = sqlEnv.registerFunction(\nAPEXCONCAT\n, PureStyleSQLApplication.class, \napex_concat_str\n);\n\n\n\n\nIn above code, \nregisterFunction\n takes the UDF name to be used in SQL, JAVA class which implements the static method and name of that method as parameters. \nThe static method \napex_concat_str\n takes two String objects as input parameters from the SQL query.\n\n\n  public static String apex_concat_str(String s1, String s2)\n  {\n    return s1 + s2;\n  }\n\n\n\n\nThe scalar UDF \"APEXCONCAT\" that was registered above can be used in SQL as described below. FLOOR and
  SUBSTRING are standard SQL scalar functions supported by Apache Calcite.\n\n\nINSERT INTO SALES \n       SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n       FROM ORDERS \n       WHERE ID \n 3 AND PRODUCT LIKE 'paint%'\n\n\n\n\nTo read about all functions and operators supported by Apache Calcite, click \nhere\n.\n\n\nExecuting SQL Query\n\n\nFinally to execute the query we need to use \nexecuteSQL\n function with a DAG and SQL statement as parameters.\n\n\n  // Converting SQL statement to DAG \n  String sql = \nINSERT INTO SALES \n                SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n                FROM ORDERS \n                WHERE ID \n 3 AND PRODUCT LIKE 'paint%'\n;\n  sqlEnv.executeSQL(dag, sql);\n\n\n\n\nWhen executeSQL method is called, the query goes through various phases like conversion to relational algebra, optimization and planning in Calcite to generate Rela
 tion Expression Tree. \nThe generated Relation Expression Tree is consumed by Apex SQL and converted to a DAG using operators available in Apache Malhar. In the above example, the ORDERS and SALES tables will be converted to the operators KafkaInputOperator and FileOutputFormatter respectively, paired with the CSVParser formatter in both cases.\n\n\nA \nWHERE\n clause is used in this query; it defines the desired filter for rows and is converted to a \nFilterTransformOperator\n in the DAG. Similarly, the projection defining desired columns is converted into another instance of the \nFilterTransformOperator\n. The DAG created for this application will look like this:\n\n\n\n\n\n\nExample 2: Fusion Style SQL Application\n\n\nAs described in Pure Style SQL application, we can use different data sources as source and sink while developing Apex Applications with Calcite. This example will describe how we can develop Apex application with Apex stream as abstract table for SQL query. Actua
 l source code can be found \nhere\n.\n\n\n  // Define Kafka Input Operator for reading data from Kafka\n  KafkaSinglePortInputOperator kafkaInput = dag.addOperator(\nKafkaInput\n, \n                                                           KafkaSinglePortInputOperator.class);\n\n  kafkaInput.setInitialOffset(\nEARLIEST\n);\n\n  // Add CSVParser\n  CsvParser csvParser = dag.addOperator(\nCSVParser\n, CsvParser.class);\n  dag.addStream(\nKafkaToCSV\n, kafkaInput.outputPort, csvParser.in);\n\n\n\n\nOnce we define DAG with KafkaInputOperator and CSVParser, it can parse data from Kafka topic. Upto this point, this is a regular Apex application without SQL. After this, we can register the output of CSVParser as a table using \nStreamEndpoint\n to run a SQL statement. This way we can develop applications in fusion style where the DAG is part SQL and part regular Apex DAG.\n\n\nThe following code will describe how we can define StreamEndpoint. \n\n\n  SQLExecEnvironment sqlEnv = sqlEnv.get
 Environment();\n  Map\nString, Class\n fieldMapping = ImmutableMap.\nString, Class\nof(\nRowTime\n, Date.class,\n                                                                 \nid\n, Integer.class,\n                                                                 \nProduct\n, String.class,\n                                                                 \nunits\n, Integer.class);\n  sqlEnv = sqlEnv.registerTable(\nFROMCSV\n, new StreamEndpoint(csvParser.out, fieldMapping));\n\n\n\n\nTo read existing data stream, we need to register it as a table with SQL execution environment with the name of the table and StreamEndpoint. StreamEndpoint can serve as input table or output table in SQL. For input table configuration we need to initialise StreamEndpoint with immediate upstream operator's output port and fieldMapping or POJO class for input tuple(as shown above). For output table configuration, we need to initialise StreamEndpoint with immediate downstream operator's input port and 
 fieldMapping or POJO class for output tuple. Once we register StreamEndpoint as a table with a name in SQL Execution Environment, it can be used as a table in SQL statement similar to other endpoints.\n\n\nWhen executeSQL method is called, the specified SQL is converted to DAG as described in the previous section. Both examples read CSV data from Kafka. But in the pure style SQL example the \nKafkaInputOperator\n and \nCSVParser\n in the DAG are created implicitly by the use of the KafkaEndpoint usage while in the fusion style example, they are explicitly defined as part of the DAG which is then extended with other operators as shown in the image below. \n\n\n\n\nFor all Apex-Calcite integration examples, click \nhere\n. \n\n\nOngoing efforts\n\n\nApache Apex-Calcite integration provides support for basic queries and efforts are underway to extend support for aggregations, sorting and other features using Tumbling, Hopping and Session Windows.\nSupport for JSON, XML and JDBC endpoin
 t are also planned. The goal of this integration is to make developing a streaming application using SQL easy so that SQL Developers don't have to write any java code at all.", 
+            "title": "SQL"
+        }, 
+        {
+            "location": "/apis/calcite/#apex-calcite-integration", 
+            "text": "Apache Calcite is a highly customizable engine for parsing and planning queries on relational data from various data sources; it provides storage independent optimization of queries and ways to integrate them into other frameworks which would like to take advantage and expose SQL capability to their users. For details, please read at  Apache Calcite Website .   Particularly in SQL on Apex, Calcite processes a query and then creates relational algebra to create processing pipelines. These relational algebra processing pipelines are converted to a DAG with a set of operators to perform business logic on streaming data.   Above figure explains how SQL query gets converted to Apex DAG.   User specified query is processed by Calcite Query planner; this involves parsing and optimizing the query to generate Relation Expression Tree.   This Relation Expression Tree is received by Apache Apex\u2019s SQL module to finally convert to an Apex DAG having series of operators.
    One peculiarity of Calcite queries is that the data source and destination need not be RDBMS systems; in the above example,  File  refers to a file in the filesystem and  Kafka  to a Kafka message broker. Calcite allows Apex to register table sources and destinations as anything which can return a row type results. So a \u201cscan\u201d relational expression gets converted to \u201cKafkaInputOperator + ParseOperator\u201d, a result of which is series of POJOs reflecting a Row Type. Similarly, the \u201cinsert\u201d Relational Expression translated to \u201cFormatOperator + FileOutputOperator\u201d.  For more details about the integration, click  here .", 
+            "title": "Apex-Calcite Integration"
+        }, 
+        {
+            "location": "/apis/calcite/#sql-apis-for-apache-apex", 
+            "text": "Listed below are the Java APIs which can be used by SQL/Apex users to create a DAG in the implementation of the  populateDAG  method of the  StreamingApplication  interface.     API  Description      SQLExecEnvironment.getEnvironment()  Creates a new SQL execution environment    SQLExecEnvironment.registerTable(tableName, endpointInstance)  Registers a new abstract table with existing environment.  endpointInstance  is an object of type  Endpoint  which defines a table.    SQLExecEnvironment.registerFunction(sqlFunctionName, holderClass, staticFunctionName)  Registers a new User Defined Scalar function    SQLExecEnvironment.executeSQL(dag, sqlStatement)  Creates a DAG for a particular SQL statement     Usage of above APIs is described in detail in following sections.", 
+            "title": "SQL APIs for Apache Apex"
+        }, 
+        {
+            "location": "/apis/calcite/#example-1-pure-style-sql-application", 
+            "text": "With Apache Calcite Integration, you can use SQL queries across different data sources and provide UDFs (User Defined Functions) as per your business logic. This example will use a Kafka topic as the source and a HDFS file as the destination.\nFollowing application code will be used to explain APIs. Actual source code can be found  here .    public class PureStyleSQLApplication implements StreamingApplication\n  {\n    @Override\n    public void populateDAG(DAG dag, Configuration conf)\n    {\n       // Create new SQLExecEnvironment\n       SQLExecEnvironment sqlEnv = SQLExecEnvironment.getEnvironment();\n\n      // This is a string that defines a schema and is discussed in more detail in  Registering tables with SQLExecEnvironment  section \n      String inputSchemaString =  ... ;\n\n      // similar to inputSchemaString, we also need to define outputSchemaString\n      String outputSchemaString =  ... ;\n\n       // Register KafkaEnpoint as  ORDERS  table with
  kafka topic and data format as CSV\n       sqlEnv = sqlEnv.registerTable( \n                                     ORDERS , \n                                    new KafkaEndpoint( localhost:9090 , \n                                                       inputTopic , \n                                                      new CSVMessageFormat(inputSchemaString))\n                                  );\n\n       // Register FileEndpoint as  SALES  table with file path and data format as CSV\n       sqlEnv = sqlEnv.registerTable( \n                                     SALES , \n                                    new FileEndpoint( /tmp/output , \n                                                      out.file , \n                                                     new CSVMessageFormat(outputSchemaString))\n                                  );\n\n       // Register scalar SQL UDF \n       sqlEnv = sqlEnv.registerFunction( APEXCONCAT , PureStyleSQLApplication.class,  apex_concat_str );\n\n
        // Converting SQL statement to DAG \n       String sql =  INSERT INTO SALES \n                       SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n                       FROM ORDERS \n                       WHERE ID   3 AND PRODUCT LIKE 'paint%' ;\n       sqlEnv.executeSQL(dag, sql);\n    }// populateDAG finished\n\n    public static String apex_concat_str(String s1, String s2)\n    {\n        return s1 + s2;\n    } \n  }", 
+            "title": "Example 1: Pure Style SQL Application"
+        }, 
+        {
+            "location": "/apis/calcite/#constructing-sqlexecenvironment", 
+            "text": "The class  SQLExecEnvironment  provides a starting point and a simple way to define metadata needed for running a SQL statement; a new instance of this class is returned by the  getEnvironment  static method.      // Creates SQLExecEnvironment instance by using static method getEnvironment\n  SQLExecEnvironment sqlEnv = SQLExecEnvironment.getEnvironment();", 
+            "title": "Constructing SQLExecEnvironment"
+        }, 
+        {
+            "location": "/apis/calcite/#registering-tables-with-sqlexecenvironment", 
+            "text": "Next, we need to register tables which can be used in a query. For this purpose, we can use  registerTable  method from SQLExecEnvironment.    // Register KafkaEnpoint as  ORDERS  table with kafka topic and data format as CSV\n  sqlEnv = sqlEnv.registerTable( \n                               ORDERS , \n                              new KafkaEndpoint( localhost:9090 , \n                                                 inputTopic , \n                                                new CSVMessageFormat(inputSchemaString))\n                            );\n\n  // Register FileEndpoint as  SALES  table with file path and data format as CSV\n  sqlEnv = sqlEnv.registerTable( \n                               SALES , \n                              new FileEndpoint( /tmp/output , \n                                                out.file , \n                                               new CSVMessageFormat(inputSchemaString))\n                            );  \"registerT
 able\"  method takes the name of the table and an instance of endpoint as parameters. Endpoint signifies data storage mechanism and type of source/destination for the data. These endpoints require different types of configurations and possibly data formats. The data format is defined using an implementation of the  MessageFormat  interface; the  CSVMessageFormat  implementation can be configured with a schema string as follows:  {\n   separator :  , ,\n   quoteChar :  \\ ,\n   fields : [\n    {\n       name :  RowTime ,\n       type :  Date ,\n       constraints : {\n         format :  dd/MM/yyyy hh:mm:ss Z \n      }\n    },\n    {\n       name :  id ,\n       type :  Integer \n    },\n    {\n       name :  Product ,\n       type :  String \n    },\n    {\n       name :  units ,\n       type :  Integer \n    }\n  ]\n}  The schema string is a JSON string defining a separator character, quote character for fields with String type and a list of fields where, for each field, its name, t
 ype and any additional constraints are specified.  Following data endpoints are supported:    KafkaEnpoint \n: To define a Kafka Endpoint we need to specify the Kafka broker (as host:port), topic name and MessageFormat as seen in line 1 in the code above.  FileEndpoint \n: It needs to be configured with the filesystem path, file name and MessageFormat as in line 2 in the code above.   StreamEndpoint  \n: This allows us to connect existing operator output or input ports to the SQL query as a data source or sink respectively. StreamEndpoint needs immediate downstream operator's input port or immediate upstream operator's output port and the field mapping for CSV data or POJO class. This will be explained in detail in next  example .", 
+            "title": "Registering tables with SQLExecEnvironment"
+        }, 
+        {
+            "location": "/apis/calcite/#using-user-defined-functions-udf-in-a-sql-query", 
+            "text": "We can use our own scalar UDF, implemented in Java, in a SQL statement for data manipulation but first, we need to register the function with the execution environment by using the  registerFunction  method.    sqlEnv = sqlEnv.registerFunction( APEXCONCAT , PureStyleSQLApplication.class,  apex_concat_str );  In above code,  registerFunction  takes the UDF name to be used in SQL, JAVA class which implements the static method and name of that method as parameters. \nThe static method  apex_concat_str  takes two String objects as input parameters from the SQL query.    public static String apex_concat_str(String s1, String s2)\n  {\n    return s1 + s2;\n  }  The scalar UDF \"APEXCONCAT\" that was registered above can be used in SQL as described below. FLOOR and SUBSTRING are standard SQL scalar functions supported by Apache Calcite.  INSERT INTO SALES \n       SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n       FR
 OM ORDERS \n       WHERE ID   3 AND PRODUCT LIKE 'paint%'  To read about all functions and operators supported by Apache Calcite, click  here .", 
+            "title": "Using User Defined Functions (UDF) in a SQL query"
+        }, 
+        {
+            "location": "/apis/calcite/#executing-sql-query", 
+            "text": "Finally to execute the query we need to use  executeSQL  function with a DAG and SQL statement as parameters.    // Converting SQL statement to DAG \n  String sql =  INSERT INTO SALES \n                SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n                FROM ORDERS \n                WHERE ID   3 AND PRODUCT LIKE 'paint%' ;\n  sqlEnv.executeSQL(dag, sql);  When executeSQL method is called, the query goes through various phases like conversion to relational algebra, optimization and planning in Calcite to generate Relation Expression Tree. \nThe generated Relation Expression Tree is consumed by Apex SQL and converted to a DAG using operators available in Apache Malhar. In the above example, the ORDERS and SALES tables will be converted to the operators KafkaInputOperator and FileOutputFormatter respectively, paired with the CSVParser formatter in both cases.  A  WHERE  clause is used in this query; it de
 fines the desired filter for rows and is converted to a  FilterTransformOperator  in the DAG. Similarly, the projection defining desired columns is converted into another instance of the  FilterTransformOperator . The DAG created for this application will look like this:", 
+            "title": "Executing SQL Query"
+        }, 
+        {
+            "location": "/apis/calcite/#example-2-fusion-style-sql-application", 
+            "text": "As described in Pure Style SQL application, we can use different data sources as source and sink while developing Apex Applications with Calcite. This example will describe how we can develop Apex application with Apex stream as abstract table for SQL query. Actual source code can be found  here .    // Define Kafka Input Operator for reading data from Kafka\n  KafkaSinglePortInputOperator kafkaInput = dag.addOperator( KafkaInput , \n                                                           KafkaSinglePortInputOperator.class);\n\n  kafkaInput.setInitialOffset( EARLIEST );\n\n  // Add CSVParser\n  CsvParser csvParser = dag.addOperator( CSVParser , CsvParser.class);\n  dag.addStream( KafkaToCSV , kafkaInput.outputPort, csvParser.in);  Once we define DAG with KafkaInputOperator and CSVParser, it can parse data from Kafka topic. Upto this point, this is a regular Apex application without SQL. After this, we can register the output of CSVParser as a table using  Str
 eamEndpoint  to run a SQL statement. This way we can develop applications in fusion style where the DAG is part SQL and part regular Apex DAG.  The following code will describe how we can define StreamEndpoint.     SQLExecEnvironment sqlEnv = sqlEnv.getEnvironment();\n  Map String, Class  fieldMapping = ImmutableMap. String, Class of( RowTime , Date.class,\n                                                                  id , Integer.class,\n                                                                  Product , String.class,\n                                                                  units , Integer.class);\n  sqlEnv = sqlEnv.registerTable( FROMCSV , new StreamEndpoint(csvParser.out, fieldMapping));  To read existing data stream, we need to register it as a table with SQL execution environment with the name of the table and StreamEndpoint. StreamEndpoint can serve as input table or output table in SQL. For input table configuration we need to initialise StreamEndpoint w
 ith immediate upstream operator's output port and fieldMapping or POJO class for input tuple(as shown above). For output table configuration, we need to initialise StreamEndpoint with immediate downstream operator's input port and fieldMapping or POJO class for output tuple. Once we register StreamEndpoint as a table with a name in SQL Execution Environment, it can be used as a table in SQL statement similar to other endpoints.  When executeSQL method is called, the specified SQL is converted to DAG as described in the previous section. Both examples read CSV data from Kafka. But in the pure style SQL example the  KafkaInputOperator  and  CSVParser  in the DAG are created implicitly by the use of the KafkaEndpoint usage while in the fusion style example, they are explicitly defined as part of the DAG which is then extended with other operators as shown in the image below.    For all Apex-Calcite integration examples, click  here .", 
+            "title": "Example 2: Fusion Style SQL Application"
+        }, 
+        {
+            "location": "/apis/calcite/#ongoing-efforts", 
+            "text": "Apache Apex-Calcite integration provides support for basic queries and efforts are underway to extend support for aggregations, sorting and other features using Tumbling, Hopping and Session Windows.\nSupport for JSON, XML and JDBC endpoint are also planned. The goal of this integration is to make developing a streaming application using SQL easy so that SQL Developers don't have to write any java code at all.", 
+            "title": "Ongoing efforts"
+        }, 
+        {
+            "location": "/operators/block_reader/", 
+            "text": "Block Reader\n\n\nThis is a scalable operator that reads and parses blocks of data sources into records. A data source can be a file or a message bus that contains records and a block defines a chunk of data in the source by specifying the block offset and the length of the source belonging to the block. \n\n\nWhy is it needed?\n\n\nA Block Reader is needed to parallelize reading and parsing of a single data source, for example a file. Simple parallelism of reading data sources can be achieved by multiple partitions reading different source of same type (for files see \nAbstractFileInputOperator\n) but Block Reader partitions can read blocks of same source in parallel and parse them for records ensuring that no record is duplicated or missed.\n\n\nClass Diagram\n\n\n\n\nAbstractBlockReader\n\n\nThis is the abstract implementation that serves as the base for different types of data sources. It defines how a block metadata is processed. The flow diagram below desc
 ribes the processing of a block metadata.\n\n\n\n\nPorts\n\n\n\n\n\n\nblocksMetadataInput: input port on which block metadata are received.\n\n\n\n\n\n\nblocksMetadataOutput: output port on which block metadata are emitted if the port is connected. This port is useful when a downstream operator that receives records from block reader may also be interested to know the details of the corresponding blocks.\n\n\n\n\n\n\nmessages: output port on which tuples of type \ncom.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord\n are emitted. This class encapsulates a \nrecord\n and the \nblockId\n of the corresponding block.\n\n\n\n\n\n\nreaderContext\n\n\nThis is one of the most important fields in the block reader. It is of type \ncom.datatorrent.lib.io.block.ReaderContext\n and is responsible for fetching bytes that make a record. It also lets the reader know how many total bytes were consumed which may not be equal to the total bytes in a record because consumed bytes also include
  bytes for the record delimiter which may not be a part of the actual record.\n\n\nOnce the reader creates an input stream for the block (or uses the previous opened stream if the current block is successor of the previous block) it initializes the reader context by invoking \nreaderContext.initialize(stream, blockMetadata, consecutiveBlock);\n. Initialize method is where any implementation of \nReaderContext\n can perform all the operations which have to be executed just before reading the block or create states which are used during the lifetime of reading the block.\n\n\nOnce the initialization is done, \nreaderContext.next()\n is called repeatedly until it returns \nnull\n. It is left to the \nReaderContext\n implementations to decide when a block is completely processed. In cases when a record is split across adjacent blocks, reader context may decide to read ahead of the current block boundary to completely fetch the split record (examples- \nLineReaderContext\n and \nReadAhea
 dLineReaderContext\n). In other cases when there isn't a possibility of split record (example- \nFixedBytesReaderContext\n), it returns \nnull\n immediately when the block boundary is reached. The return type of \nreaderContext.next()\n is of type \ncom.datatorrent.lib.io.block.ReaderContext.Entity\n which is just a wrapper for a \nbyte[]\n that represents the record and total bytes used in fetching the record.\n\n\nAbstract methods\n\n\n\n\n\n\nSTREAM setupStream(B block)\n: creating a stream for a block is dependent on the type of source which is not known to AbstractBlockReader. Sub-classes which deal with a specific data source provide this implementation.\n\n\n\n\n\n\nR convertToRecord(byte[] bytes)\n: this converts the array of bytes into the actual instance of record type.\n\n\n\n\n\n\nAuto-scalability\n\n\nBlock reader can auto-scale, that is, depending on the backlog (total number of all the blocks which are waiting in the \nblocksMetadataInput\n port queue of all partition
 s) it can create more partitions or reduce them. Details are discussed in the last section which covers the \npartitioner and stats-listener\n.\n\n\nConfiguration\n\n\n\n\nmaxReaders\n: when auto-scaling is enabled, this controls the maximum number of block reader partitions that can be created.\n\n\nminReaders\n: when auto-scaling is enabled, this controls the minimum number of block reader partitions that should always exist.\n\n\ncollectStats\n: this enables or disables auto-scaling. When it is set to \ntrue\n the stats (number of blocks in the queue) are collected and this triggers partitioning; otherwise auto-scaling is disabled.\n\n\nintervalMillis\n: when auto-scaling is enabled, this specifies the interval at which the reader will trigger the logic of computing the backlog and auto-scale.\n\n\n\n\n AbstractFSBlockReader\n\n\nThis abstract implementation deals with files. Different types of file systems that are implementations of \norg.apache.hadoop.fs.FileSystem\n are suppo
 rted. The user can override \ngetFSInstance()\n method to create an instance of a specific \nFileSystem\n. By default, filesystem instance is created from the filesytem URI that comes from the default hadoop configuration.\n\n\nprotected FileSystem getFSInstance() throws IOException\n{\n  return FileSystem.newInstance(configuration);\n}\n\n\n\n\nIt uses this filesystem instance to setup a stream of type \norg.apache.hadoop.fs.FSDataInputStream\n to read the block.\n\n\n@Override\nprotected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata block) throws IOException\n{\n  return fs.open(new Path(block.getFilePath()));\n}\n\n\n\n\nAll the ports and configurations are derived from the super class. It doesn't provide an implementation of \nconvertToRecord(byte[] bytes)\n method which is delegated to concrete sub-classes.\n\n\nExample Application\n\n\nThis simple dag demonstrates how any concrete implementation of \nAbstractFSBlockReader\n can be plugged into an application. \
 n\n\n\n\nIn the above application, file splitter creates block metadata for files which are sent to block reader. Partitions of the block reader parses the file blocks for records which are filtered, transformed and then persisted to a file (created per block). Therefore block reader is parallel partitioned with the 2 downstream operators - filter/converter and record output operator. The code which implements this dag is below.\n\n\npublic class ExampleApplication implements StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, Configuration configuration)\n  {\n    FileSplitterInput input = dag.addOperator(\nFile-splitter\n, new FileSplitterInput());\n    //any concrete implementation of AbstractFSBlockReader based on the use-case can be added here.\n    LineReader blockReader = dag.addOperator(\nBlock-reader\n, new LineReader());\n    Filter filter = dag.addOperator(\nFilter\n, new Filter());\n    RecordOutputOperator recordOutputOperator = dag.addOperator(\nRe
 cord-writer\n, new RecordOutputOperator());\n\n    dag.addStream(\nfile-block metadata\n, input.blocksMetadataOutput, blockReader.blocksMetadataInput);\n    dag.addStream(\nrecords\n, blockReader.messages, filter.input);\n    dag.addStream(\nfiltered-records\n, filter.output, recordOutputOperator.input);\n  }\n\n  /**\n   * Concrete implementation of {@link AbstractFSBlockReader} for which a record is a line in the file.\n   */\n  public static class LineReader extends AbstractFSBlockReader.AbstractFSReadAheadLineReader\nString\n\n  {\n\n    @Override\n    protected String convertToRecord(byte[] bytes)\n    {\n      return new String(bytes);\n    }\n  }\n\n  /**\n   * Considers any line starting with a '.' as invalid. Emits the valid records.\n   */\n  public static class Filter extends BaseOperator\n  {\n    public final transient DefaultOutputPort\nAbstractBlockReader.ReaderRecord\nString\n output = new DefaultOutputPort\n();\n    public final transient DefaultInputPort\nAbstractB
 lockReader.ReaderRecord\nString\n input = new DefaultInputPort\nAbstractBlockReader.ReaderRecord\nString\n()\n    {\n      @Override\n      public void process(AbstractBlockReader.ReaderRecord\nString\n stringRecord)\n      {\n        //filter records and transform\n        //if the string starts with a '.' ignore the string.\n        if (!StringUtils.startsWith(stringRecord.getRecord(), \n.\n)) {\n          output.emit(stringRecord);\n        }\n      }\n    };\n  }\n\n  /**\n   * Persists the valid records to corresponding block files.\n   */\n  public static class RecordOutputOperator extends AbstractFileOutputOperator\nAbstractBlockReader.ReaderRecord\nString\n\n  {\n    @Override\n    protected String getFileName(AbstractBlockReader.ReaderRecord\nString\n tuple)\n    {\n      return Long.toHexString(tuple.getBlockId());\n    }\n\n    @Override\n    protected byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord\nString\n tuple)\n    {\n      return tuple.getRecord().getBytes
 ();\n    }\n  }\n}\n\n\n\n\nConfiguration to parallel partition block reader with its downstream operators.\n\n\n  \nproperty\n\n    \nname\ndt.operator.Filter.port.input.attr.PARTITION_PARALLEL\n/name\n\n    \nvalue\ntrue\n/value\n\n  \n/property\n\n  \nproperty\n\n    \nname\ndt.operator.Record-writer.port.input.attr.PARTITION_PARALLEL\n/name\n\n    \nvalue\ntrue\n/value\n\n  \n/property\n\n\n\n\n\nAbstractFSReadAheadLineReader\n\n\nThis extension of \nAbstractFSBlockReader\n parses lines from a block and binds the \nreaderContext\n field to an instance of \nReaderContext.ReadAheadLineReaderContext\n.\n\n\nIt is abstract because it doesn't provide an implementation of \nconvertToRecord(byte[] bytes)\n since the user may want to convert the bytes that make a line into some other type. \n\n\nReadAheadLineReaderContext\n\n\nIn order to handle a line split across adjacent blocks, ReadAheadLineReaderContext always reads beyond the block boundary and ignores the bytes till the first end
 -of-line character of all the blocks except the first block of the file. This ensures that no line is missed or incomplete.\n\n\nThis is one of the most common ways of handling a split record. It doesn't require any further information to decide if a line is complete. However, the cost of this consistent way to handle a line split is that it always reads from the next block.\n\n\nAbstractFSLineReader\n\n\nSimilar to \nAbstractFSReadAheadLineReader\n, even this parses lines from a block. However, it binds the \nreaderContext\n field to an instance of \nReaderContext.LineReaderContext\n.\n\n\nLineReaderContext\n\n\nThis handles the line split differently from \nReadAheadLineReaderContext\n. It doesn't always read from the next block. If the end of the last line is aligned with the block boundary then it stops processing the block. It does read from the next block when the boundaries are not aligned, that is, last line extends beyond the block boundary. The result of this is an inconsi
 stency in reading the next block.\n\n\nWhen the boundary of the last line of the previous block was aligned with its block, then the first line of the current block is a valid line. However, in the other case the bytes from the block start offset to the first end-of-line character should be ignored. Therefore, this means that any record formed by this reader context has to be validated. For example, if the lines are of fixed size then size of each record can be validated or if each line begins with a special field then that knowledge can be used to check if a record is complete.\n\n\nIf the validations of completeness fails for a line then \nconvertToRecord(byte[] bytes)\n should return null.\n\n\nFSSliceReader\n\n\nA concrete extension of \nAbstractFSBlockReader\n that reads fixed-size \nbyte[]\n from a block and emits the byte array wrapped in \ncom.datatorrent.netlet.util.Slice\n.\n\n\nThis operator binds the \nreaderContext\n to an instance of \nReaderContext.FixedBytesReaderCon
 text\n.\n\n\nFixedBytesReaderContext\n\n\nThis implementation of \nReaderContext\n never reads beyond a block boundary which can result in the last \nbyte[]\n of a block to be of a shorter length than the rest of the records.\n\n\nConfiguration\n\n\nreaderContext.length\n: length of each record. By default, this is initialized to the default hdfs block size.\n\n\nPartitioner and StatsListener\n\n\nThe logical instance of the block reader acts as the Partitioner (unless a custom partitioner is set using the operator attribute - \nPARTITIONER\n) as well as a StatsListener. This is because the \n\nAbstractBlockReader\n implements both the \ncom.datatorrent.api.Partitioner\n and \ncom.datatorrent.api.StatsListener\n interfaces and provides an implementation of \ndefinePartitions(...)\n and \nprocessStats(...)\n which make it auto-scalable.\n\n\nprocessStats \n\n\nThe application master invokes \nResponse processStats(BatchedOperatorStats stats)\n method on the logical instance with the 
 stats (\ntuplesProcessedPSMA\n, \ntuplesEmittedPSMA\n, \nlatencyMA\n, etc.) of each partition. The data which this operator is interested in is the \nqueueSize\n of the input port \nblocksMetadataInput\n.\n\n\nUsually the \nqueueSize\n of an input port gives the count of waiting control tuples plus data tuples. However, if a stats listener is interested only in the count of data tuples then that can be expressed by annotating the class with \n@DataQueueSize\n. In this case \nAbstractBlockReader\n itself is the \nStatsListener\n which is why it is annotated with \n@DataQueueSize\n.\n\n\nThe logical instance caches the queue size per partition and at regular intervals (configured by \nintervalMillis\n) sums these values to find the total backlog which is then used to decide whether re-partitioning is needed. The flow-diagram below describes this logic.\n\n\n\n\nThe goal of this logic is to create as many partitions within bounds (see \nmaxReaders\n and \nminReaders\n above) to quickly
  reduce this backlog or if the backlog is small then remove any idle partitions.\n\n\ndefinePartitions\n\n\nBased on the \nrepartitionRequired\n field of the \nResponse\n object which is returned by \nprocessStats\n method, the application master invokes \n\n\nCollection\nPartition\nAbstractBlockReader\n...\n definePartitions(Collection\nPartition\nAbstractBlockReader\n...\n partitions, PartitioningContext context)\n\n\n\n\non the logical instance which is also the partitioner instance. The implementation calculates the difference between required partitions and the existing count of partitions. If this difference is negative, then equivalent number of partitions are removed otherwise new partitions are created. \n\n\nPlease note auto-scaling can be disabled by setting \ncollectStats\n to \nfalse\n. If the use-case requires only static partitioning, then that can be achieved by setting \nStatelessPartitioner\n as the operator attribute- \nPARTITIONER\n on the block reader.", 
+            "title": "Block Reader"
+        }, 
+        {
+            "location": "/operators/block_reader/#block-reader", 
+            "text": "This is a scalable operator that reads and parses blocks of data sources into records. A data source can be a file or a message bus that contains records and a block defines a chunk of data in the source by specifying the block offset and the length of the source belonging to the block.", 
+            "title": "Block Reader"
+        }, 
+        {
+            "location": "/operators/block_reader/#why-is-it-needed", 
+            "text": "A Block Reader is needed to parallelize reading and parsing of a single data source, for example a file. Simple parallelism of reading data sources can be achieved by multiple partitions reading different source of same type (for files see  AbstractFileInputOperator ) but Block Reader partitions can read blocks of same source in parallel and parse them for records ensuring that no record is duplicated or missed.", 
+            "title": "Why is it needed?"
+        }, 
+        {
+            "location": "/operators/block_reader/#class-diagram", 
+            "text": "", 
+            "title": "Class Diagram"
+        }, 
+        {
+            "location": "/operators/block_reader/#abstractblockreader", 
+            "text": "This is the abstract implementation that serves as the base for different types of data sources. It defines how a block metadata is processed. The flow diagram below describes the processing of a block metadata.", 
+            "title": "AbstractBlockReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#ports", 
+            "text": "blocksMetadataInput: input port on which block metadata are received.    blocksMetadataOutput: output port on which block metadata are emitted if the port is connected. This port is useful when a downstream operator that receives records from block reader may also be interested to know the details of the corresponding blocks.    messages: output port on which tuples of type  com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord  are emitted. This class encapsulates a  record  and the  blockId  of the corresponding block.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/block_reader/#readercontext", 
+            "text": "This is one of the most important fields in the block reader. It is of type  com.datatorrent.lib.io.block.ReaderContext  and is responsible for fetching bytes that make a record. It also lets the reader know how many total bytes were consumed which may not be equal to the total bytes in a record because consumed bytes also include bytes for the record delimiter which may not be a part of the actual record.  Once the reader creates an input stream for the block (or uses the previous opened stream if the current block is successor of the previous block) it initializes the reader context by invoking  readerContext.initialize(stream, blockMetadata, consecutiveBlock); . Initialize method is where any implementation of  ReaderContext  can perform all the operations which have to be executed just before reading the block or create states which are used during the lifetime of reading the block.  Once the initialization is done,  readerContext.next()  is called repeatedl
 y until it returns  null . It is left to the  ReaderContext  implementations to decide when a block is completely processed. In cases when a record is split across adjacent blocks, reader context may decide to read ahead of the current block boundary to completely fetch the split record (examples-  LineReaderContext  and  ReadAheadLineReaderContext ). In other cases when there isn't a possibility of split record (example-  FixedBytesReaderContext ), it returns  null  immediately when the block boundary is reached. The return type of  readerContext.next()  is of type  com.datatorrent.lib.io.block.ReaderContext.Entity  which is just a wrapper for a  byte[]  that represents the record and total bytes used in fetching the record.", 
+            "title": "readerContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#abstract-methods", 
+            "text": "STREAM setupStream(B block) : creating a stream for a block is dependent on the type of source which is not known to AbstractBlockReader. Sub-classes which deal with a specific data source provide this implementation.    R convertToRecord(byte[] bytes) : this converts the array of bytes into the actual instance of record type.", 
+            "title": "Abstract methods"
+        }, 
+        {
+            "location": "/operators/block_reader/#auto-scalability", 
+            "text": "Block reader can auto-scale, that is, depending on the backlog (total number of all the blocks which are waiting in the  blocksMetadataInput  port queue of all partitions) it can create more partitions or reduce them. Details are discussed in the last section which covers the  partitioner and stats-listener .", 
+            "title": "Auto-scalability"
+        }, 
+        {
+            "location": "/operators/block_reader/#configuration", 
+            "text": "maxReaders : when auto-scaling is enabled, this controls the maximum number of block reader partitions that can be created.  minReaders : when auto-scaling is enabled, this controls the minimum number of block reader partitions that should always exist.  collectStats : this enables or disables auto-scaling. When it is set to  true  the stats (number of blocks in the queue) are collected and this triggers partitioning; otherwise auto-scaling is disabled.  intervalMillis : when auto-scaling is enabled, this specifies the interval at which the reader will trigger the logic of computing the backlog and auto-scale.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": "/operators/block_reader/#example-application", 
+            "text": "This simple dag demonstrates how any concrete implementation of  AbstractFSBlockReader  can be plugged into an application.    In the above application, file splitter creates block metadata for files which are sent to block reader. Partitions of the block reader parses the file blocks for records which are filtered, transformed and then persisted to a file (created per block). Therefore block reader is parallel partitioned with the 2 downstream operators - filter/converter and record output operator. The code which implements this dag is below.  public class ExampleApplication implements StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, Configuration configuration)\n  {\n    FileSplitterInput input = dag.addOperator( File-splitter , new FileSplitterInput());\n    //any concrete implementation of AbstractFSBlockReader based on the use-case can be added here.\n    LineReader blockReader = dag.addOperator( Block-reader , new LineReader());\n 
    Filter filter = dag.addOperator( Filter , new Filter());\n    RecordOutputOperator recordOutputOperator = dag.addOperator( Record-writer , new RecordOutputOperator());\n\n    dag.addStream( file-block metadata , input.blocksMetadataOutput, blockReader.blocksMetadataInput);\n    dag.addStream( records , blockReader.messages, filter.input);\n    dag.addStream( filtered-records , filter.output, recordOutputOperator.input);\n  }\n\n  /**\n   * Concrete implementation of {@link AbstractFSBlockReader} for which a record is a line in the file.\n   */\n  public static class LineReader extends AbstractFSBlockReader.AbstractFSReadAheadLineReader String \n  {\n\n    @Override\n    protected String convertToRecord(byte[] bytes)\n    {\n      return new String(bytes);\n    }\n  }\n\n  /**\n   * Considers any line starting with a '.' as invalid. Emits the valid records.\n   */\n  public static class Filter extends BaseOperator\n  {\n    public final transient DefaultOutputPort AbstractBlockRea
 der.ReaderRecord String  output = new DefaultOutputPort ();\n    public final transient DefaultInputPort AbstractBlockReader.ReaderRecord String  input = new DefaultInputPort AbstractBlockReader.ReaderRecord String ()\n    {\n      @Override\n      public void process(AbstractBlockReader.ReaderRecord String  stringRecord)\n      {\n        //filter records and transform\n        //if the string starts with a '.' ignore the string.\n        if (!StringUtils.startsWith(stringRecord.getRecord(),  . )) {\n          output.emit(stringRecord);\n        }\n      }\n    };\n  }\n\n  /**\n   * Persists the valid records to corresponding block files.\n   */\n  public static class RecordOutputOperator extends AbstractFileOutputOperator AbstractBlockReader.ReaderRecord String \n  {\n    @Override\n    protected String getFileName(AbstractBlockReader.ReaderRecord String  tuple)\n    {\n      return Long.toHexString(tuple.getBlockId());\n    }\n\n    @Override\n    protected byte[] getBytesForTup
 le(AbstractBlockReader.ReaderRecord String  tuple)\n    {\n      return tuple.getRecord().getBytes();\n    }\n  }\n}  Configuration to parallel partition block reader with its downstream operators.     property \n     name dt.operator.Filter.port.input.attr.PARTITION_PARALLEL /name \n     value true /value \n   /property \n   property \n     name dt.operator.Record-writer.port.input.attr.PARTITION_PARALLEL /name \n     value true /value \n   /property", 
+            "title": "Example Application"
+        }, 
+        {
+            "location": "/operators/block_reader/#abstractfsreadaheadlinereader", 
+            "text": "This extension of  AbstractFSBlockReader  parses lines from a block and binds the  readerContext  field to an instance of  ReaderContext.ReadAheadLineReaderContext .  It is abstract because it doesn't provide an implementation of  convertToRecord(byte[] bytes)  since the user may want to convert the bytes that make a line into some other type.", 
+            "title": "AbstractFSReadAheadLineReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#readaheadlinereadercontext", 
+            "text": "In order to handle a line split across adjacent blocks, ReadAheadLineReaderContext always reads beyond the block boundary and ignores the bytes till the first end-of-line character of all the blocks except the first block of the file. This ensures that no line is missed or incomplete.  This is one of the most common ways of handling a split record. It doesn't require any further information to decide if a line is complete. However, the cost of this consistent way to handle a line split is that it always reads from the next block.", 
+            "title": "ReadAheadLineReaderContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#abstractfslinereader", 
+            "text": "Similar to  AbstractFSReadAheadLineReader , even this parses lines from a block. However, it binds the  readerContext  field to an instance of  ReaderContext.LineReaderContext .", 
+            "title": "AbstractFSLineReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#linereadercontext", 
+            "text": "This handles the line split differently from  ReadAheadLineReaderContext . It doesn't always read from the next block. If the end of the last line is aligned with the block boundary then it stops processing the block. It does read from the next block when the boundaries are not aligned, that is, last line extends beyond the block boundary. The result of this is an inconsistency in reading the next block.  When the boundary of the last line of the previous block was aligned with its block, then the first line of the current block is a valid line. However, in the other case the bytes from the block start offset to the first end-of-line character should be ignored. Therefore, this means that any record formed by this reader context has to be validated. For example, if the lines are of fixed size then size of each record can be validated or if each line begins with a special field then that knowledge can be used to check if a record is complete.  If the validations 
 of completeness fails for a line then  convertToRecord(byte[] bytes)  should return null.", 
+            "title": "LineReaderContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#fsslicereader", 
+            "text": "A concrete extension of  AbstractFSBlockReader  that reads fixed-size  byte[]  from a block and emits the byte array wrapped in  com.datatorrent.netlet.util.Slice .  This operator binds the  readerContext  to an instance of  ReaderContext.FixedBytesReaderContext .", 
+            "title": "FSSliceReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#fixedbytesreadercontext", 
+            "text": "This implementation of  ReaderContext  never reads beyond a block boundary which can result in the last  byte[]  of a block to be of a shorter length than the rest of the records.", 
+            "title": "FixedBytesReaderContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#configuration_1", 
+            "text": "readerContext.length : length of each record. By default, this is initialized to the default hdfs block size.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": "/operators/block_reader/#partitioner-and-statslistener", 
+            "text": "The logical instance of the block reader acts as the Partitioner (unless a custom partitioner is set using the operator attribute -  PARTITIONER ) as well as a StatsListener. This is because the  AbstractBlockReader  implements both the  com.datatorrent.api.Partitioner  and  com.datatorrent.api.StatsListener  interfaces and provides an implementation of  definePartitions(...)  and  processStats(...)  which make it auto-scalable.", 
+            "title": "Partitioner and StatsListener"
+        }, 
+        {
+            "location": "/operators/block_reader/#processstats", 
+            "text": "The application master invokes  Response processStats(BatchedOperatorStats stats)  method on the logical instance with the stats ( tuplesProcessedPSMA ,  tuplesEmittedPSMA ,  latencyMA , etc.) of each partition. The data which this operator is interested in is the  queueSize  of the input port  blocksMetadataInput .  Usually the  queueSize  of an input port gives the count of waiting control tuples plus data tuples. However, if a stats listener is interested only in the count of data tuples then that can be expressed by annotating the class with  @DataQueueSize . In this case  AbstractBlockReader  itself is the  StatsListener  which is why it is annotated with  @DataQueueSize .  The logical instance caches the queue size per partition and at regular intervals (configured by  intervalMillis ) sums these values to find the total backlog which is then used to decide whether re-partitioning is needed. The flow-diagram below describes this logic.   The goal of this l
 ogic is to create as many partitions within bounds (see  maxReaders  and  minReaders  above) to quickly reduce this backlog or if the backlog is small then remove any idle partitions.", 
+            "title": "processStats "
+        }, 
+        {
+            "location": "/operators/block_reader/#definepartitions", 
+            "text": "Based on the  repartitionRequired  field of the  Response  object which is returned by  processStats  method, the application master invokes   Collection Partition AbstractBlockReader ...  definePartitions(Collection Partition AbstractBlockReader ...  partitions, PartitioningContext context)  on the logical instance which is also the partitioner instance. The implementation calculates the difference between required partitions and the existing count of partitions. If this difference is negative, then equivalent number of partitions are removed otherwise new partitions are created.   Please note auto-scaling can be disabled by setting  collectStats  to  false . If the use-case requires only static partitioning, then that can be achieved by setting  StatelessPartitioner  as the operator attribute-  PARTITIONER  on the block reader.", 
+            "title": "definePartitions"
+        }, 
+        {
+            "location": "/operators/csvformatter/", 
+            "text": "CsvFormatter\n\n\nOperator Objective\n\n\nThis operator receives a POJO (\nPlain Old Java Object\n) as an incoming tuple, converts the data in \nthe incoming POJO to a custom delimited string and emits the delimited string.\n\n\nCsvFormatter supports schema definition as a JSON string. \n\n\nCsvFormatter does not hold any state and is \nidempotent\n, \nfault-tolerant\n and \nstatically/dynamically partitionable\n.\n\n\nOperator Information\n\n\n\n\nOperator location: \nmalhar-contrib\n\n\nAvailable since: \n3.2.0\n\n\nOperator state: \nEvolving\n\n\nJava Packages:\n\n\nOperator: \ncom.datatorrent.contrib.formatter.CsvFormatter\n\n\n\n\n\n\n\n\nProperties, Attributes and Ports\n\n\nProperties of POJOEnricher\n\n\n\n\n\n\n\n\nProperty\n\n\nDescription\n\n\nType\n\n\nMandatory\n\n\nDefault Value\n\n\n\n\n\n\n\n\n\n\nschema\n\n\nContents of the schema.Schema is specified in a json format.\n\n\nString\n\n\nYes\n\n\nN/A\n\n\n\n\n\n\n\n\nPlatform Attributes that influe
 nces operator behavior\n\n\n\n\n\n\n\n\nAttribute\n\n\nDescription\n\n\nType\n\n\nMandatory\n\n\n\n\n\n\n\n\n\n\nin.TUPLE_CLASS\n\n\nTUPLE_CLASS attribute on input port which tells operator the class of POJO which will be incoming\n\n\nClass or FQCN\n\n\nYes\n\n\n\n\n\n\n\n\nPorts\n\n\n\n\n\n\n\n\nPort\n\n\nDescription\n\n\nType\n\n\nMandatory\n\n\n\n\n\n\n\n\n\n\nin\n\n\nTuples which need to be formatted are received on this port\n\n\nObject (POJO)\n\n\nYes\n\n\n\n\n\n\nout\n\n\nTuples that are formatted are emitted from this port\n\n\nString\n\n\nNo\n\n\n\n\n\n\nerr\n\n\nTuples that could not be converted are emitted on this port\n\n\nObject\n\n\nNo\n\n\n\n\n\n\n\n\nLimitations\n\n\nCurrent CsvFormatter contain following limitations:\n\n\n\n\nThe field names in schema and the pojo field names should match.For eg. if name of the schema field is \"customerName\", then POJO should contain a field with the same name. \n\n\nField wise validation/formatting is not yet supported.\n\n\nTh
 e fields will be written to the file in the same order as specified in schema.json\n\n\n\n\nExample\n\n\nExample for CsvFormatter can be found at: \nhttps://github.com/DataTorrent/examples/tree/master/tutorials/csvformatter\n\n\nAdvanced\n\n\n Schema format for CsvFormatter\n\n\nCsvFormatter expects schema to be a String in JSON format:\n\n\nExample for format of schema:\n\n\n{\n  \nseparator\n: \n,\n,\n  \nquoteChar\n: \n\\\n,\n  \nlineDelimiter\n: \n\\n\n,\n  \nfields\n: [\n    {\n      \nname\n: \ncampaignId\n,\n      \ntype\n: \nInteger\n\n    },\n    {\n      \nname\n: \nstartDate\n,\n      \ntype\n: \nDate\n,\n      \nconstraints\n: {\n        \nformat\n: \nyyyy-MM-dd\n\n      }\n    }\n    ]\n}\n\n\n\n\nPartitioning of CsvFormatter\n\n\nBeing stateless operator, CsvFormatter will ensure built-in partitioners present in Malhar library can be directly used by setting properties as follows:\n\n\nStateless partioning of CsvFormatter\n\n\nStateless partitioning will ensure that Cs
 vFormatter will be partitioned right at the start of the application and will remain partitioned throughout the lifetime of the DAG.\nCsvFormatter can be stateless partitioned by adding following lines to properties.xml:\n\n\n  \nproperty\n\n    \nname\ndt.operator.{OperatorName}.attr.PARTITIONER\n/name\n\n    \nvalue\ncom.datatorrent.common.partitioner.StatelessPartitioner:2\n/value\n\n  \n/property\n\n\n\n\n\nwhere {OperatorName} is the name of the CsvFormatter operator.\nAbove lines will partition CsvFormatter statically 2 times. Above value can be changed accordingly to change the number of static partitions.\n\n\nDynamic Partitioning of CsvFormatter\n\n\nDynamic partitioning is a feature of Apex platform which changes the partition of the operator based on certain conditions.\nCsvFormatter can be dynamically partitioned using below out-of-the-box partitioner:\n\n\nThroughput based\n\n\nFollowing code can be added to populateDAG method of application to dynamically partition Csv
 Formatter:\n\n\n    StatelessThroughputBasedPartitioner\nCsvFormatter\n partitioner = new StatelessThroughputBasedPartitioner\n();\n    partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));\n    partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));\n    partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));\n    dag.setAttribute(csvFormatter, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));\n    dag.setAttribute(csvFormatter, OperatorContext.PARTITIONER, partitioner);\n\n\n\n\nAbove code will dynamically partition CsvFormatter when throughput changes.\nIf overall throughput of CsvFormatter goes beyond 30000 or less than 10000, the platform will repartition CsvFormatter \nto balance throughput of a single partition to be between 10000 and 30000.\nCooldownMillis of 10000 will be used as threshold time for which  throughput change is observed.", 
+            "title": "CSV Formatter"
+        }, 
+        {
+            "location": "/operators/csvformatter/#csvformatter", 
+            "text": "", 
+            "title": "CsvFormatter"
+        }, 
+        {
+            "location": "/operators/csvformatter/#operator-objective", 
+            "text": "This operator receives a POJO ( Plain Old Java Object ) as an incoming tuple, converts the data in \nthe incoming POJO to a custom delimited string and emits the delimited string.  CsvFormatter supports schema definition as a JSON string.   CsvFormatter does not hold any state and is  idempotent ,  fault-tolerant  and  statically/dynamically partitionable .", 
+            "title": "Operator Objective"
+        }, 
+        {
+            "location": "/operators/csvformatter/#operator-information", 
+            "text": "Operator location:  malhar-contrib  Available since:  3.2.0  Operator state:  Evolving  Java Packages:  Operator:  com.datatorrent.contrib.formatter.CsvFormatter", 
+            "title": "Operator Information"
+        }, 
+        {
+            "location": "/operators/csvformatter/#properties-attributes-and-ports", 
+            "text": "", 
+            "title": "Properties, Attributes and Ports"
+        }, 
+        {
+            "location": "/operators/csvformatter/#platform-attributes-that-influences-operator-behavior", 
+            "text": "Attribute  Description  Type  Mandatory      in.TUPLE_CLASS  TUPLE_CLASS attribute on input port which tells operator the class of POJO which will be incoming  Class or FQCN  Yes", 
+            "title": "Platform Attributes that influences operator behavior"
+        }, 
+        {
+            "location": "/operators/csvformatter/#ports", 
+            "text": "Port  Description  Type  Mandatory      in  Tuples which need to be formatted are received on this port  Object (POJO)  Yes    out  Tuples that are formatted are emitted from this port  String  No    err  Tuples that could not be converted are emitted on this port  Object  No", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/csvformatter/#limitations", 
+            "text": "Current CsvFormatter contain following limitations:   The field names in schema and the pojo field names should match.For eg. if name of the schema field is \"customerName\", then POJO should contain a field with the same name.   Field wise validation/formatting is not yet supported.  The fields will be written to the file in the same order as specified in schema.json", 
+            "title": "Limitations"
+        }, 
+        {
+            "location": "/operators/csvformatter/#example", 
+            "text": "Example for CsvFormatter can be found at:  https://github.com/DataTorrent/examples/tree/master/tutorials/csvformatter", 
+            "title": "Example"
+        }, 
+        {
+            "location": "/operators/csvformatter/#advanced", 
+            "text": "", 
+            "title": "Advanced"
+        }, 
+        {
+            "location": "/operators/csvformatter/#partitioning-of-csvformatter", 
+            "text": "Being stateless operator, CsvFormatter will ensure built-in partitioners present in Malhar library can be directly used by setting properties as follows:", 
+            "title": "Partitioning of CsvFormatter"
+        }, 
+        {
+            "location": "/operators/csvformatter/#stateless-partioning-of-csvformatter", 
+            "text": "Stateless partitioning will ensure that CsvFormatter will be partitioned right at the start of the application and will remain partitioned throughout the lifetime of the DAG.\nCsvFormatter can be stateless partitioned by adding following lines to properties.xml:     property \n     name dt.operator.{OperatorName}.attr.PARTITIONER /name \n     value com.datatorrent.common.partitioner.StatelessPartitioner:2 /value \n   /property   where {OperatorName} is the name of the CsvFormatter operator.\nAbove lines will partition CsvFormatter statically 2 times. Above value can be changed accordingly to change the number of static partitions.", 
+            "title": "Stateless partioning of CsvFormatter"
+        }, 
+        {
+            "location": "/operators/csvformatter/#dynamic-partitioning-of-csvformatter", 
+            "text": "Dynamic partitioning is a feature of Apex platform which changes the partition of the operator based on certain conditions.\nCsvFormatter can be dynamically partitioned using below out-of-the-box partitioner:", 
+            "title": "Dynamic Partitioning of CsvFormatter"
+        }, 
+        {
+            "location": "/operators/csvformatter/#throughput-based", 
+            "text": "Following code can be added to populateDAG method of application to dynamically partition CsvFormatter:      StatelessThroughputBasedPartitioner CsvFormatter  partitioner = new StatelessThroughputBasedPartitioner ();\n    partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));\n    partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));\n    partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));\n    dag.setAttribute(csvFormatter, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));\n    dag.setAttribute(csvFormatter, OperatorContext.PARTITIONER, partitioner);  Above code will dynamically partition CsvFormatter when throughput changes.\nIf overall throughput of CsvFormatter goes beyond 30000 or less than 10000, the platform will repartition CsvFormatter \nto balance throughput of a single partition to be between 10000 and 30000.\nCooldownMillis of 10000 will be used as threshold time for which  t
 hroughput change is observed.", 
+            "title": "Throughput based"
+        }, 
+        {
+            "location": "/operators/csvParserOperator/", 
+            "text": "Csv Parser Operator\n\n\nOperator Objective\n\n\nThis operator is designed to parse delimited records and construct a map or concrete java class also known as \n\"POJO\"\n out of it. User need to provide the schema to describe the delimited data. Based on schema definition the operator will parse the incoming record to object map and POJO.  User can also provide constraints if any, in the schema. The supported constraints are listed in \nconstraints table\n. The incoming record will be validated against those constraints. Valid records will be emitted as POJO / map while invalid ones are emitted on error port with error message.\n\n\nNote\n: field names of POJO must match field names in schema and in the same order as it appears in the incoming data.\n\n\nOverview\n\n\nThe operator is \nidempotent\n, \nfault-tolerant\n and \npartitionable\n.\n\n\nClass Diagram\n\n\n\n\nOperator Information\n\n\n\n\nOperator location:\nmalhar-contrib\n\n\nAvailable since:\n3.2.0\
 n\n\nOperator state:\nEvolving\n\n\nJava Package:\ncom.datatorrent.contrib.parser.CsvParser\n\n\

<TRUNCATED>