You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/03/28 06:32:49 UTC
[07/16] apex-site git commit: Adding malhar-3.7.0 documentation
http://git-wip-us.apache.org/repos/asf/apex-site/blob/b25c090d/docs/malhar-3.7/mkdocs/search_index.json
----------------------------------------------------------------------
diff --git a/docs/malhar-3.7/mkdocs/search_index.json b/docs/malhar-3.7/mkdocs/search_index.json
new file mode 100644
index 0000000..5ded716
--- /dev/null
+++ b/docs/malhar-3.7/mkdocs/search_index.json
@@ -0,0 +1,2134 @@
+{
+ "docs": [
+ {
+ "location": "/",
+ "text": "Apache Apex Malhar\n\n\nApache Apex Malhar is an open source operator and codec library that can be used with the \nApache Apex\n platform to build real-time streaming applications. Enabling users to extract value quickly, Malhar operators help get data in, analyze it in real-time, and get data out of Hadoop. In addition to the operators, the library contains a number of example applications, demonstrating operator features and capabilities.\n\n\n\n\nCapabilities common across Malhar operators\n\n\nFor most streaming platforms, connectors are afterthoughts and often end up being simple \u2018bolt-ons\u2019 to the platform. As a result they often cause performance issues or data loss when put through failure scenarios and scalability requirements. Malhar operators do not face these issues as they were designed to be integral parts of Apex. Hence, they have following core streaming runtime capabilities\n\n\n\n\nFault tolerance\n \u2013 Malhar operators where app
licable have fault tolerance built in. They use the checkpoint capability provided by the framework to ensure that there is no data loss under ANY failure scenario.\n\n\nProcessing guarantees\n \u2013 Malhar operators where applicable provide out of the box support for ALL three processing guarantees \u2013 exactly once, at-least once, and at-most once WITHOUT requiring the user to write any additional code. Some operators, like MQTT operator, deal with source systems that can not track processed data and hence need the operators to keep track of the data. Malhar has support for a generic operator that uses alternate storage like HDFS to facilitate this. Finally for databases that support transactions or support any sort of atomic batch operations Malhar operators can do exactly once down to the tuple level.\n\n\nDynamic updates\n \u2013 Based on changing business conditions you often have to tweak several parameters used by the operators in your streaming application without inc
urring any application downtime. You can also change properties of a Malhar operator at runtime without having to bring down the application.\n\n\nEase of extensibility\n \u2013 Malhar operators are based on templates that are easy to extend.\n\n\nPartitioning support\n \u2013 In streaming applications the input data stream often needs to be partitioned based on the contents of the stream. Also for operators that ingest data from external systems partitioning needs to be done based on the capabilities of the external system. For example with Kafka, the operator can automatically scale up or down based on the changes in the number of Kafka partitions.\n\n\n\n\nOperator Library Overview\n\n\nInput/output connectors\n\n\nBelow is a summary of the various sub categories of input and output operators. Input operators also have a corresponding output operator\n\n\n\n\nFile Systems\n \u2013 Most streaming analytics use cases require the data to be stored in HDFS or perhaps S3 if the appli
cation is running in AWS. Users often need to re-run their streaming analytical applications against historical data or consume data from upstream processes that are perhaps writing to some NFS share. Apex supports input \n output operators for HDFS, S3, NFS \n Local Files. There are also File Splitter and Block Reader operators, which can accelecate processing of large files by splitting and paralellizing the work across non-overlapping sets of file blocks.\n\n\nRelational Databases\n \u2013 Most stream processing use cases require some reference data lookups to enrich, tag or filter streaming data. There is also a need to save results of the streaming analytical computation to a database so an operational dashboard can see them. Apex supports a JDBC operator so you can read/write data from any JDBC compliant RDBMS like Oracle, MySQL, Sqlite, etc.\n\n\nNoSQL Databases\n \u2013 NoSQL key-value pair databases like Cassandra \n HBase are a common part of streaming analytics applica
tion architectures to lookup reference data or store results. Malhar has operators for HBase, Cassandra, Accumulo, Aerospike, MongoDB, and CouchDB.\n\n\nMessaging Systems\n \u2013 Kafka, JMS, and similar systems are the workhorses of messaging infrastructure in most enterprises. Malhar has a robust, industry-tested set of operators to read and write Kafka, JMS, ZeroMQ, and RabbitMQ messages.\n\n\nNotification Systems\n \u2013 Malhar includes an operator for sending notifications via SMTP.\n\n\nIn-memory Databases \n Caching platforms\n - Some streaming use cases need instantaneous access to shared state across the application. Caching platforms and in-memory databases serve this purpose really well. To support these use cases, Malhar has operators for memcached and Redis.\n\n\nSocial Media\n - Malhar includes an operator to connect to the popular Twitter stream fire hose.\n\n\nProtocols\n - Malhar provides connectors that can communicate in HTTP, RSS, Socket, WebSocket, FTP, and M
QTT.\n\n\n\n\nParsers\n\n\nThere are many industry vertical specific data formats that a streaming application developer might need to parse. Often there are existing parsers available for these that can be directly plugged into an Apache Apex application. For example in the Telco space, a Java based CDR parser can be directly plugged into Apache Apex operator. To further simplify development experience, Malhar also provides some operators for parsing common formats like XML (DOM \n SAX), JSON (flat map converter), Apache log files, syslog, etc.\n\n\nStream manipulation\n\n\nStreaming data inevitably needs processing to clean, filter, tag, summarize, etc. The goal of Malhar is to enable the application developer to focus on WHAT needs to be done to the stream to get it in the right format and not worry about the HOW. Malhar has several operators to perform the common stream manipulation actions like \u2013 GroupBy, Join, Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, O
uter join, Select, Update etc.\n\n\nCompute\n\n\nOne of the most important promises of a streaming analytics platform like Apache Apex is the ability to do analytics in real-time. However delivering on the promise becomes really difficult when the platform does not provide out of the box operators to support variety of common compute functions as the user then has to worry about making these scalable, fault tolerant, stateful, etc. Malhar takes this responsibility away from the application developer by providing a variety of out of the box computational operators.\n\n\nBelow is just a snapshot of the compute operators available in Malhar\n\n\n\n\nStatistics and math - Various mathematical and statistical computations over application defined time windows.\n\n\nFiltering and pattern matching\n\n\nSorting, maps, frequency, TopN, BottomN\n\n\nRandom data generators\n\n\n\n\nLanguages Support\n\n\nMigrating to a new platform often requires re-use of the existing code that would be diff
icult or time-consuming to re-write. With this in mind, Malhar supports invocation of code written in other languages by wrapping them in one of the library operators, and allows execution of software written in:\n\n\n\n\nJavaScript\n\n\nPython\n\n\nR\n\n\nRuby",
+ "title": "Apache Apex Malhar"
+ },
+ {
+ "location": "/#apache-apex-malhar",
+ "text": "Apache Apex Malhar is an open source operator and codec library that can be used with the Apache Apex platform to build real-time streaming applications. Enabling users to extract value quickly, Malhar operators help get data in, analyze it in real-time, and get data out of Hadoop. In addition to the operators, the library contains a number of example applications, demonstrating operator features and capabilities.",
+ "title": "Apache Apex Malhar"
+ },
+ {
+ "location": "/#capabilities-common-across-malhar-operators",
+ "text": "For most streaming platforms, connectors are afterthoughts and often end up being simple \u2018bolt-ons\u2019 to the platform. As a result they often cause performance issues or data loss when put through failure scenarios and scalability requirements. Malhar operators do not face these issues as they were designed to be integral parts of Apex. Hence, they have following core streaming runtime capabilities Fault tolerance \u2013 Malhar operators where applicable have fault tolerance built in. They use the checkpoint capability provided by the framework to ensure that there is no data loss under ANY failure scenario. Processing guarantees \u2013 Malhar operators where applicable provide out of the box support for ALL three processing guarantees \u2013 exactly once, at-least once, and at-most once WITHOUT requiring the user to write any additional code. Some operators, like MQTT operator, deal with source systems that can not track processed data and hence n
eed the operators to keep track of the data. Malhar has support for a generic operator that uses alternate storage like HDFS to facilitate this. Finally for databases that support transactions or support any sort of atomic batch operations Malhar operators can do exactly once down to the tuple level. Dynamic updates \u2013 Based on changing business conditions you often have to tweak several parameters used by the operators in your streaming application without incurring any application downtime. You can also change properties of a Malhar operator at runtime without having to bring down the application. Ease of extensibility \u2013 Malhar operators are based on templates that are easy to extend. Partitioning support \u2013 In streaming applications the input data stream often needs to be partitioned based on the contents of the stream. Also for operators that ingest data from external systems partitioning needs to be done based on the capabilities of the external system. Fo
r example with Kafka, the operator can automatically scale up or down based on the changes in the number of Kafka partitions.",
+ "title": "Capabilities common across Malhar operators"
+ },
+ {
+ "location": "/#operator-library-overview",
+ "text": "",
+ "title": "Operator Library Overview"
+ },
+ {
+ "location": "/#inputoutput-connectors",
+ "text": "Below is a summary of the various sub categories of input and output operators. Input operators also have a corresponding output operator File Systems \u2013 Most streaming analytics use cases require the data to be stored in HDFS or perhaps S3 if the application is running in AWS. Users often need to re-run their streaming analytical applications against historical data or consume data from upstream processes that are perhaps writing to some NFS share. Apex supports input output operators for HDFS, S3, NFS Local Files. There are also File Splitter and Block Reader operators, which can accelecate processing of large files by splitting and paralellizing the work across non-overlapping sets of file blocks. Relational Databases \u2013 Most stream processing use cases require some reference data lookups to enrich, tag or filter streaming data. There is also a need to save results of the streaming analytical computation to a database so an operational das
hboard can see them. Apex supports a JDBC operator so you can read/write data from any JDBC compliant RDBMS like Oracle, MySQL, Sqlite, etc. NoSQL Databases \u2013 NoSQL key-value pair databases like Cassandra HBase are a common part of streaming analytics application architectures to lookup reference data or store results. Malhar has operators for HBase, Cassandra, Accumulo, Aerospike, MongoDB, and CouchDB. Messaging Systems \u2013 Kafka, JMS, and similar systems are the workhorses of messaging infrastructure in most enterprises. Malhar has a robust, industry-tested set of operators to read and write Kafka, JMS, ZeroMQ, and RabbitMQ messages. Notification Systems \u2013 Malhar includes an operator for sending notifications via SMTP. In-memory Databases Caching platforms - Some streaming use cases need instantaneous access to shared state across the application. Caching platforms and in-memory databases serve this purpose really well. To support these use cases, Malhar
has operators for memcached and Redis. Social Media - Malhar includes an operator to connect to the popular Twitter stream fire hose. Protocols - Malhar provides connectors that can communicate in HTTP, RSS, Socket, WebSocket, FTP, and MQTT.",
+ "title": "Input/output connectors"
+ },
+ {
+ "location": "/#parsers",
+ "text": "There are many industry vertical specific data formats that a streaming application developer might need to parse. Often there are existing parsers available for these that can be directly plugged into an Apache Apex application. For example in the Telco space, a Java based CDR parser can be directly plugged into Apache Apex operator. To further simplify development experience, Malhar also provides some operators for parsing common formats like XML (DOM SAX), JSON (flat map converter), Apache log files, syslog, etc.",
+ "title": "Parsers"
+ },
+ {
+ "location": "/#stream-manipulation",
+ "text": "Streaming data inevitably needs processing to clean, filter, tag, summarize, etc. The goal of Malhar is to enable the application developer to focus on WHAT needs to be done to the stream to get it in the right format and not worry about the HOW. Malhar has several operators to perform the common stream manipulation actions like \u2013 GroupBy, Join, Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, Outer join, Select, Update etc.",
+ "title": "Stream manipulation"
+ },
+ {
+ "location": "/#compute",
+ "text": "One of the most important promises of a streaming analytics platform like Apache Apex is the ability to do analytics in real-time. However delivering on the promise becomes really difficult when the platform does not provide out of the box operators to support variety of common compute functions as the user then has to worry about making these scalable, fault tolerant, stateful, etc. Malhar takes this responsibility away from the application developer by providing a variety of out of the box computational operators. Below is just a snapshot of the compute operators available in Malhar Statistics and math - Various mathematical and statistical computations over application defined time windows. Filtering and pattern matching Sorting, maps, frequency, TopN, BottomN Random data generators",
+ "title": "Compute"
+ },
+ {
+ "location": "/#languages-support",
+ "text": "Migrating to a new platform often requires re-use of the existing code that would be difficult or time-consuming to re-write. With this in mind, Malhar supports invocation of code written in other languages by wrapping them in one of the library operators, and allows execution of software written in: JavaScript Python R Ruby",
+ "title": "Languages Support"
+ },
+ {
+ "location": "/apis/calcite/",
+ "text": "Apache Apex is a unified stream and batch processing engine that enables application developers to process data at very high throughput with low latency. Although the different types of data have different processing needs, SQL remains a popular and a generic way for processing data. To ensure that existing ETL developers and developers who are well versed with Database applications adopt stream processing application development with ease, integration of SQL with Apex was needed. Being a popular Apache project, Apache Calcite was chosen for this purpose and its integration with Apex is described below.\n\n\nApex-Calcite Integration\n\n\nApache Calcite is a highly customizable engine for parsing and planning queries on relational data from various data sources; it provides storage independent optimization of queries and ways to integrate them into other frameworks which would like to take advantage and expose SQL capability to their users. For details, please re
ad at \nApache Calcite Website\n. \n\n\nParticularly in SQL on Apex, Calcite processes a query and then creates relational algebra to create processing pipelines. These relational algebra processing pipelines are converted to a DAG with a set of operators to perform business logic on streaming data.\n\n\n\n\nAbove figure explains how SQL query gets converted to Apex DAG.\n\n\n\n\nUser specified query is processed by Calcite Query planner; this involves parsing and optimizing the query to generate Relation Expression Tree. \n\n\nThis Relation Expression Tree is received by Apache Apex\u2019s SQL module to finally convert to an Apex DAG having series of operators.\n\n\n\n\nOne peculiarity of Calcite queries is that the data source and destination need not be RDBMS systems; in the above example, \nFile\n refers to a file in the filesystem and \nKafka\n to a Kafka message broker. Calcite allows Apex to register table sources and destinations as anything which can return a row type resul
ts. So a \u201cscan\u201d relational expression gets converted to \u201cKafkaInputOperator + ParseOperator\u201d, a result of which is series of POJOs reflecting a Row Type. Similarly, the \u201cinsert\u201d Relational Expression translated to \u201cFormatOperator + FileOutputOperator\u201d.\n\n\nFor more details about the integration, click \nhere\n.\n\n\nSQL APIs for Apache Apex\n\n\nListed below are the Java APIs which can be used by SQL/Apex users to create a DAG in the implementation of the \npopulateDAG\n method of the \nStreamingApplication\n interface.\n\n\n\n\n\n\n\n\nAPI\n\n\nDescription\n\n\n\n\n\n\n\n\n\n\nSQLExecEnvironment.getEnvironment()\n\n\nCreates a new SQL execution environment\n\n\n\n\n\n\nSQLExecEnvironment.registerTable(tableName, endpointInstance)\n\n\nRegisters a new abstract table with existing environment. \nendpointInstance\n is an object of type \nEndpoint\n which defines a table.\n\n\n\n\n\n\nSQLExecEnvironment.registerFunction(sqlFunctionName, holderCl
ass, staticFunctionName)\n\n\nRegisters a new User Defined Scalar function\n\n\n\n\n\n\nSQLExecEnvironment.executeSQL(dag, sqlStatement)\n\n\nCreates a DAG for a particular SQL statement\n\n\n\n\n\n\n\n\nUsage of above APIs is described in detail in following sections.\n\n\nExample 1: Pure Style SQL Application\n\n\nWith Apache Calcite Integration, you can use SQL queries across different data sources and provide UDFs (User Defined Functions) as per your business logic. This example will use a Kafka topic as the source and a HDFS file as the destination.\nFollowing application code will be used to explain APIs. Actual source code can be found \nhere\n.\n\n\n public class PureStyleSQLApplication implements StreamingApplication\n {\n @Override\n public void populateDAG(DAG dag, Configuration conf)\n {\n // Create new SQLExecEnvironment\n SQLExecEnvironment sqlEnv = SQLExecEnvironment.getEnvironment();\n\n // This is a string that defines a schema and is dis
cussed in more detail in \nRegistering tables with SQLExecEnvironment\n section \n String inputSchemaString = \n...\n;\n\n // similar to inputSchemaString, we also need to define outputSchemaString\n String outputSchemaString = \n...\n;\n\n // Register KafkaEnpoint as \nORDERS\n table with kafka topic and data format as CSV\n sqlEnv = sqlEnv.registerTable( \n \nORDERS\n, \n new KafkaEndpoint(\nlocalhost:9090\n, \n \ninputTopic\n, \n new CSVMessageFormat(inputSchemaString))\n );\n\n // Register FileEndpoint as \nSALES\n table with file path and data format as CSV\n sqlEnv = sqlEnv.registerTable( \n \nSALES\n, \n new FileEndpoint(\n/tmp/output\n, \n
\nout.file\n, \n new CSVMessageFormat(outputSchemaString))\n );\n\n // Register scalar SQL UDF \n sqlEnv = sqlEnv.registerFunction(\nAPEXCONCAT\n, PureStyleSQLApplication.class, \napex_concat_str\n);\n\n // Converting SQL statement to DAG \n String sql = \nINSERT INTO SALES \n SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n FROM ORDERS \n WHERE ID \n 3 AND PRODUCT LIKE 'paint%'\n;\n sqlEnv.executeSQL(dag, sql);\n }// populateDAG finished\n\n public static String apex_concat_str(String s1, String s2)\n {\n return s1 + s2;\n } \n }\n\n\n\n\nConstructing SQLExecEnvironment\n\n\nThe class \nSQLExecEnvironment\n provides a starting point and a simple way to define metadata needed for running a SQL statement; a ne
w instance of this class is returned by the \ngetEnvironment\n static method. \n\n\n // Creates SQLExecEnvironment instance by using static method getEnvironment\n SQLExecEnvironment sqlEnv = SQLExecEnvironment.getEnvironment();\n\n\n\n\nRegistering tables with SQLExecEnvironment\n\n\nNext, we need to register tables which can be used in a query. For this purpose, we can use \nregisterTable\n method from SQLExecEnvironment.\n\n\n // Register KafkaEnpoint as \nORDERS\n table with kafka topic and data format as CSV\n sqlEnv = sqlEnv.registerTable( \n \nORDERS\n, \n new KafkaEndpoint(\nlocalhost:9090\n, \n \ninputTopic\n, \n new CSVMessageFormat(inputSchemaString))\n );\n\n // Register FileEndpoint as \nSALES\n table with file path and data format as CSV\n sqlEnv = sqlEnv.registerTable( \n
\nSALES\n, \n new FileEndpoint(\n/tmp/output\n, \n \nout.file\n, \n new CSVMessageFormat(inputSchemaString))\n );\n\n\n\n\n\"registerTable\"\n method takes the name of the table and an instance of endpoint as parameters. Endpoint signifies data storage mechanism and type of source/destination for the data. These endpoints require different types of configurations and possibly data formats. The data format is defined using an implementation of the \nMessageFormat\n interface; the \nCSVMessageFormat\n implementation can be configured with a schema string as follows:\n\n\n{\n \nseparator\n: \n,\n,\n \nquoteChar\n: \n\\\n,\n \nfields\n: [\n {\n \nname\n: \nRowTime\n,\n \ntype\n: \nDate\n,\n \nconstraints\n: {\n \nformat\n: \ndd/MM/yyyy hh:mm:ss Z\n\n }\n },\n {\n \nname\n: \nid\n,\n
\ntype\n: \nInteger\n\n },\n {\n \nname\n: \nProduct\n,\n \ntype\n: \nString\n\n },\n {\n \nname\n: \nunits\n,\n \ntype\n: \nInteger\n\n }\n ]\n}\n\n\n\n\nThe schema string is a JSON string defining a separator character, quote character for fields with String type and a list of fields where, for each field, its name, type and any additional constraints are specified.\n\n\nFollowing data endpoints are supported: \n\n\n\n\nKafkaEnpoint\n\n: To define a Kafka Endpoint we need to specify the Kafka broker (as host:port), topic name and MessageFormat as seen in line 1 in the code above.\n\n\nFileEndpoint\n\n: It needs to be configured with the filesystem path, file name and MessageFormat as in line 2 in the code above. \n\n\nStreamEndpoint\n \n: This allows us to connect existing operator output or input ports to the SQL query as a data source or sink respectively. StreamEndpoint needs immediate downstream operator's input port or immediate upstr
eam operator's output port and the field mapping for CSV data or POJO class. This will be explained in detail in next \nexample\n.\n\n\n\n\nUsing User Defined Functions (UDF) in a SQL query\n\n\nWe can use our own scalar UDF, implemented in Java, in a SQL statement for data manipulation but first, we need to register the function with the execution environment by using the \nregisterFunction\n method.\n\n\n sqlEnv = sqlEnv.registerFunction(\nAPEXCONCAT\n, PureStyleSQLApplication.class, \napex_concat_str\n);\n\n\n\n\nIn above code, \nregisterFunction\n takes the UDF name to be used in SQL, JAVA class which implements the static method and name of that method as parameters. \nThe static method \napex_concat_str\n takes two String objects as input parameters from the SQL query.\n\n\n public static String apex_concat_str(String s1, String s2)\n {\n return s1 + s2;\n }\n\n\n\n\nThe scalar UDF \"APEXCONCAT\" that was registered above can be used in SQL as described below. FLOOR and
SUBSTRING are standard SQL scalar functions supported by Apache Calcite.\n\n\nINSERT INTO SALES \n SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n FROM ORDERS \n WHERE ID \n 3 AND PRODUCT LIKE 'paint%'\n\n\n\n\nTo read about all functions and operators supported by Apache Calcite, click \nhere\n.\n\n\nExecuting SQL Query\n\n\nFinally to execute the query we need to use \nexecuteSQL\n function with a DAG and SQL statement as parameters.\n\n\n // Converting SQL statement to DAG \n String sql = \nINSERT INTO SALES \n SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n FROM ORDERS \n WHERE ID \n 3 AND PRODUCT LIKE 'paint%'\n;\n sqlEnv.executeSQL(dag, sql);\n\n\n\n\nWhen executeSQL method is called, the query goes through various phases like conversion to relational algebra, optimization and planning in Calcite to generate Rela
tion Expression Tree. \nThe generated Relation Expression Tree is consumed by Apex SQL and converted to a DAG using operators available in Apache Malhar. In the above example, the ORDERS and SALES tables will be converted to the operators KafkaInputOperator and FileOutputFormatter respectively, paired with the CSVParser formatter in both cases.\n\n\nA \nWHERE\n clause is used in this query; it defines the desired filter for rows and is converted to a \nFilterTransformOperator\n in the DAG. Similarly, the projection defining desired columns is converted into another instance of the \nFilterTransformOperator\n. The DAG created for this application will look like this:\n\n\n\n\n\n\nExample 2: Fusion Style SQL Application\n\n\nAs described in Pure Style SQL application, we can use different data sources as source and sink while developing Apex Applications with Calcite. This example will describe how we can develop Apex application with Apex stream as abstract table for SQL query. Actua
l source code can be found \nhere\n.\n\n\n // Define Kafka Input Operator for reading data from Kafka\n KafkaSinglePortInputOperator kafkaInput = dag.addOperator(\nKafkaInput\n, \n KafkaSinglePortInputOperator.class);\n\n kafkaInput.setInitialOffset(\nEARLIEST\n);\n\n // Add CSVParser\n CsvParser csvParser = dag.addOperator(\nCSVParser\n, CsvParser.class);\n dag.addStream(\nKafkaToCSV\n, kafkaInput.outputPort, csvParser.in);\n\n\n\n\nOnce we define DAG with KafkaInputOperator and CSVParser, it can parse data from Kafka topic. Upto this point, this is a regular Apex application without SQL. After this, we can register the output of CSVParser as a table using \nStreamEndpoint\n to run a SQL statement. This way we can develop applications in fusion style where the DAG is part SQL and part regular Apex DAG.\n\n\nThe following code will describe how we can define StreamEndpoint. \n\n\n SQLExecEnvironment sqlEnv = sqlEnv.get
Environment();\n Map\nString, Class\n fieldMapping = ImmutableMap.\nString, Class\nof(\nRowTime\n, Date.class,\n \nid\n, Integer.class,\n \nProduct\n, String.class,\n \nunits\n, Integer.class);\n sqlEnv = sqlEnv.registerTable(\nFROMCSV\n, new StreamEndpoint(csvParser.out, fieldMapping));\n\n\n\n\nTo read existing data stream, we need to register it as a table with SQL execution environment with the name of the table and StreamEndpoint. StreamEndpoint can serve as input table or output table in SQL. For input table configuration we need to initialise StreamEndpoint with immediate upstream operator's output port and fieldMapping or POJO class for input tuple(as shown above). For output table configuration, we need to initialise StreamEndpoint with immediate downstream operator's input port and
fieldMapping or POJO class for output tuple. Once we register StreamEndpoint as a table with a name in SQL Execution Environment, it can be used as a table in SQL statement similar to other endpoints.\n\n\nWhen executeSQL method is called, the specified SQL is converted to DAG as described in the previous section. Both examples read CSV data from Kafka. But in the pure style SQL example the \nKafkaInputOperator\n and \nCSVParser\n in the DAG are created implicitly by the use of the KafkaEndpoint usage while in the fusion style example, they are explicitly defined as part of the DAG which is then extended with other operators as shown in the image below. \n\n\n\n\nFor all Apex-Calcite integration examples, click \nhere\n. \n\n\nOngoing efforts\n\n\nApache Apex-Calcite integration provides support for basic queries and efforts are underway to extend support for aggregations, sorting and other features using Tumbling, Hopping and Session Windows.\nSupport for JSON, XML and JDBC endpoin
t are also planned. The goal of this integration is to make developing a streaming application using SQL easy so that SQL Developers don't have to write any java code at all.",
+ "title": "SQL"
+ },
+ {
+ "location": "/apis/calcite/#apex-calcite-integration",
+ "text": "Apache Calcite is a highly customizable engine for parsing and planning queries on relational data from various data sources; it provides storage independent optimization of queries and ways to integrate them into other frameworks which would like to take advantage and expose SQL capability to their users. For details, please read at Apache Calcite Website . Particularly in SQL on Apex, Calcite processes a query and then creates relational algebra to create processing pipelines. These relational algebra processing pipelines are converted to a DAG with a set of operators to perform business logic on streaming data. Above figure explains how SQL query gets converted to Apex DAG. User specified query is processed by Calcite Query planner; this involves parsing and optimizing the query to generate Relation Expression Tree. This Relation Expression Tree is received by Apache Apex\u2019s SQL module to finally convert to an Apex DAG having series of operators.
One peculiarity of Calcite queries is that the data source and destination need not be RDBMS systems; in the above example, File refers to a file in the filesystem and Kafka to a Kafka message broker. Calcite allows Apex to register table sources and destinations as anything which can return a row type results. So a \u201cscan\u201d relational expression gets converted to \u201cKafkaInputOperator + ParseOperator\u201d, a result of which is series of POJOs reflecting a Row Type. Similarly, the \u201cinsert\u201d Relational Expression translated to \u201cFormatOperator + FileOutputOperator\u201d. For more details about the integration, click here .",
+ "title": "Apex-Calcite Integration"
+ },
+ {
+ "location": "/apis/calcite/#sql-apis-for-apache-apex",
+ "text": "Listed below are the Java APIs which can be used by SQL/Apex users to create a DAG in the implementation of the populateDAG method of the StreamingApplication interface. API Description SQLExecEnvironment.getEnvironment() Creates a new SQL execution environment SQLExecEnvironment.registerTable(tableName, endpointInstance) Registers a new abstract table with existing environment. endpointInstance is an object of type Endpoint which defines a table. SQLExecEnvironment.registerFunction(sqlFunctionName, holderClass, staticFunctionName) Registers a new User Defined Scalar function SQLExecEnvironment.executeSQL(dag, sqlStatement) Creates a DAG for a particular SQL statement Usage of above APIs is described in detail in following sections.",
+ "title": "SQL APIs for Apache Apex"
+ },
+ {
+ "location": "/apis/calcite/#example-1-pure-style-sql-application",
+ "text": "With Apache Calcite Integration, you can use SQL queries across different data sources and provide UDFs (User Defined Functions) as per your business logic. This example will use a Kafka topic as the source and a HDFS file as the destination.\nFollowing application code will be used to explain APIs. Actual source code can be found here . public class PureStyleSQLApplication implements StreamingApplication\n {\n @Override\n public void populateDAG(DAG dag, Configuration conf)\n {\n // Create new SQLExecEnvironment\n SQLExecEnvironment sqlEnv = SQLExecEnvironment.getEnvironment();\n\n // This is a string that defines a schema and is discussed in more detail in Registering tables with SQLExecEnvironment section \n String inputSchemaString = ... ;\n\n // similar to inputSchemaString, we also need to define outputSchemaString\n String outputSchemaString = ... ;\n\n // Register KafkaEnpoint as ORDERS table with
kafka topic and data format as CSV\n sqlEnv = sqlEnv.registerTable( \n ORDERS , \n new KafkaEndpoint( localhost:9090 , \n inputTopic , \n new CSVMessageFormat(inputSchemaString))\n );\n\n // Register FileEndpoint as SALES table with file path and data format as CSV\n sqlEnv = sqlEnv.registerTable( \n SALES , \n new FileEndpoint( /tmp/output , \n out.file , \n new CSVMessageFormat(outputSchemaString))\n );\n\n // Register scalar SQL UDF \n sqlEnv = sqlEnv.registerFunction( APEXCONCAT , PureStyleSQLApplication.class, apex_concat_str );\n\n
// Converting SQL statement to DAG \n String sql = INSERT INTO SALES \n SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n FROM ORDERS \n WHERE ID 3 AND PRODUCT LIKE 'paint%' ;\n sqlEnv.executeSQL(dag, sql);\n }// populateDAG finished\n\n public static String apex_concat_str(String s1, String s2)\n {\n return s1 + s2;\n } \n }",
+ "title": "Example 1: Pure Style SQL Application"
+ },
+ {
+ "location": "/apis/calcite/#constructing-sqlexecenvironment",
+ "text": "The class SQLExecEnvironment provides a starting point and a simple way to define metadata needed for running a SQL statement; a new instance of this class is returned by the getEnvironment static method. // Creates SQLExecEnvironment instance by using static method getEnvironment\n SQLExecEnvironment sqlEnv = SQLExecEnvironment.getEnvironment();",
+ "title": "Constructing SQLExecEnvironment"
+ },
+ {
+ "location": "/apis/calcite/#registering-tables-with-sqlexecenvironment",
+ "text": "Next, we need to register tables which can be used in a query. For this purpose, we can use registerTable method from SQLExecEnvironment. // Register KafkaEnpoint as ORDERS table with kafka topic and data format as CSV\n sqlEnv = sqlEnv.registerTable( \n ORDERS , \n new KafkaEndpoint( localhost:9090 , \n inputTopic , \n new CSVMessageFormat(inputSchemaString))\n );\n\n // Register FileEndpoint as SALES table with file path and data format as CSV\n sqlEnv = sqlEnv.registerTable( \n SALES , \n new FileEndpoint( /tmp/output , \n out.file , \n new CSVMessageFormat(inputSchemaString))\n ); \"registerT
able\" method takes the name of the table and an instance of endpoint as parameters. Endpoint signifies data storage mechanism and type of source/destination for the data. These endpoints require different types of configurations and possibly data formats. The data format is defined using an implementation of the MessageFormat interface; the CSVMessageFormat implementation can be configured with a schema string as follows: {\n separator : , ,\n quoteChar : \\ ,\n fields : [\n {\n name : RowTime ,\n type : Date ,\n constraints : {\n format : dd/MM/yyyy hh:mm:ss Z \n }\n },\n {\n name : id ,\n type : Integer \n },\n {\n name : Product ,\n type : String \n },\n {\n name : units ,\n type : Integer \n }\n ]\n} The schema string is a JSON string defining a separator character, quote character for fields with String type and a list of fields where, for each field, its name, t
ype and any additional constraints are specified. Following data endpoints are supported: KafkaEnpoint \n: To define a Kafka Endpoint we need to specify the Kafka broker (as host:port), topic name and MessageFormat as seen in line 1 in the code above. FileEndpoint \n: It needs to be configured with the filesystem path, file name and MessageFormat as in line 2 in the code above. StreamEndpoint \n: This allows us to connect existing operator output or input ports to the SQL query as a data source or sink respectively. StreamEndpoint needs immediate downstream operator's input port or immediate upstream operator's output port and the field mapping for CSV data or POJO class. This will be explained in detail in next example .",
+ "title": "Registering tables with SQLExecEnvironment"
+ },
+ {
+ "location": "/apis/calcite/#using-user-defined-functions-udf-in-a-sql-query",
+ "text": "We can use our own scalar UDF, implemented in Java, in a SQL statement for data manipulation but first, we need to register the function with the execution environment by using the registerFunction method. sqlEnv = sqlEnv.registerFunction( APEXCONCAT , PureStyleSQLApplication.class, apex_concat_str ); In above code, registerFunction takes the UDF name to be used in SQL, JAVA class which implements the static method and name of that method as parameters. \nThe static method apex_concat_str takes two String objects as input parameters from the SQL query. public static String apex_concat_str(String s1, String s2)\n {\n return s1 + s2;\n } The scalar UDF \"APEXCONCAT\" that was registered above can be used in SQL as described below. FLOOR and SUBSTRING are standard SQL scalar functions supported by Apache Calcite. INSERT INTO SALES \n SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n FR
OM ORDERS \n WHERE ID 3 AND PRODUCT LIKE 'paint%' To read about all functions and operators supported by Apache Calcite, click here .",
+ "title": "Using User Defined Functions (UDF) in a SQL query"
+ },
+ {
+ "location": "/apis/calcite/#executing-sql-query",
+ "text": "Finally to execute the query we need to use executeSQL function with a DAG and SQL statement as parameters. // Converting SQL statement to DAG \n String sql = INSERT INTO SALES \n SELECT STREAM ROWTIME, FLOOR(ROWTIME TO DAY), APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) \n FROM ORDERS \n WHERE ID 3 AND PRODUCT LIKE 'paint%' ;\n sqlEnv.executeSQL(dag, sql); When executeSQL method is called, the query goes through various phases like conversion to relational algebra, optimization and planning in Calcite to generate Relation Expression Tree. \nThe generated Relation Expression Tree is consumed by Apex SQL and converted to a DAG using operators available in Apache Malhar. In the above example, the ORDERS and SALES tables will be converted to the operators KafkaInputOperator and FileOutputFormatter respectively, paired with the CSVParser formatter in both cases. A WHERE clause is used in this query; it de
fines the desired filter for rows and is converted to a FilterTransformOperator in the DAG. Similarly, the projection defining desired columns is converted into another instance of the FilterTransformOperator . The DAG created for this application will look like this:",
+ "title": "Executing SQL Query"
+ },
+ {
+ "location": "/apis/calcite/#example-2-fusion-style-sql-application",
+ "text": "As described in Pure Style SQL application, we can use different data sources as source and sink while developing Apex Applications with Calcite. This example will describe how we can develop Apex application with Apex stream as abstract table for SQL query. Actual source code can be found here . // Define Kafka Input Operator for reading data from Kafka\n KafkaSinglePortInputOperator kafkaInput = dag.addOperator( KafkaInput , \n KafkaSinglePortInputOperator.class);\n\n kafkaInput.setInitialOffset( EARLIEST );\n\n // Add CSVParser\n CsvParser csvParser = dag.addOperator( CSVParser , CsvParser.class);\n dag.addStream( KafkaToCSV , kafkaInput.outputPort, csvParser.in); Once we define DAG with KafkaInputOperator and CSVParser, it can parse data from Kafka topic. Upto this point, this is a regular Apex application without SQL. After this, we can register the output of CSVParser as a table using Str
eamEndpoint to run a SQL statement. This way we can develop applications in fusion style where the DAG is part SQL and part regular Apex DAG. The following code will describe how we can define StreamEndpoint. SQLExecEnvironment sqlEnv = sqlEnv.getEnvironment();\n Map String, Class fieldMapping = ImmutableMap. String, Class of( RowTime , Date.class,\n id , Integer.class,\n Product , String.class,\n units , Integer.class);\n sqlEnv = sqlEnv.registerTable( FROMCSV , new StreamEndpoint(csvParser.out, fieldMapping)); To read existing data stream, we need to register it as a table with SQL execution environment with the name of the table and StreamEndpoint. StreamEndpoint can serve as input table or output table in SQL. For input table configuration we need to initialise StreamEndpoint w
ith immediate upstream operator's output port and fieldMapping or POJO class for input tuple(as shown above). For output table configuration, we need to initialise StreamEndpoint with immediate downstream operator's input port and fieldMapping or POJO class for output tuple. Once we register StreamEndpoint as a table with a name in SQL Execution Environment, it can be used as a table in SQL statement similar to other endpoints. When executeSQL method is called, the specified SQL is converted to DAG as described in the previous section. Both examples read CSV data from Kafka. But in the pure style SQL example the KafkaInputOperator and CSVParser in the DAG are created implicitly by the use of the KafkaEndpoint usage while in the fusion style example, they are explicitly defined as part of the DAG which is then extended with other operators as shown in the image below. For all Apex-Calcite integration examples, click here .",
+ "title": "Example 2: Fusion Style SQL Application"
+ },
+ {
+ "location": "/apis/calcite/#ongoing-efforts",
+ "text": "Apache Apex-Calcite integration provides support for basic queries and efforts are underway to extend support for aggregations, sorting and other features using Tumbling, Hopping and Session Windows.\nSupport for JSON, XML and JDBC endpoint are also planned. The goal of this integration is to make developing a streaming application using SQL easy so that SQL Developers don't have to write any java code at all.",
+ "title": "Ongoing efforts"
+ },
+ {
+ "location": "/operators/block_reader/",
+ "text": "Block Reader\n\n\nThis is a scalable operator that reads and parses blocks of data sources into records. A data source can be a file or a message bus that contains records and a block defines a chunk of data in the source by specifying the block offset and the length of the source belonging to the block. \n\n\nWhy is it needed?\n\n\nA Block Reader is needed to parallelize reading and parsing of a single data source, for example a file. Simple parallelism of reading data sources can be achieved by multiple partitions reading different source of same type (for files see \nAbstractFileInputOperator\n) but Block Reader partitions can read blocks of same source in parallel and parse them for records ensuring that no record is duplicated or missed.\n\n\nClass Diagram\n\n\n\n\nAbstractBlockReader\n\n\nThis is the abstract implementation that serves as the base for different types of data sources. It defines how a block metadata is processed. The flow diagram below desc
ribes the processing of a block metadata.\n\n\n\n\nPorts\n\n\n\n\n\n\nblocksMetadataInput: input port on which block metadata are received.\n\n\n\n\n\n\nblocksMetadataOutput: output port on which block metadata are emitted if the port is connected. This port is useful when a downstream operator that receives records from block reader may also be interested to know the details of the corresponding blocks.\n\n\n\n\n\n\nmessages: output port on which tuples of type \ncom.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord\n are emitted. This class encapsulates a \nrecord\n and the \nblockId\n of the corresponding block.\n\n\n\n\n\n\nreaderContext\n\n\nThis is one of the most important fields in the block reader. It is of type \ncom.datatorrent.lib.io.block.ReaderContext\n and is responsible for fetching bytes that make a record. It also lets the reader know how many total bytes were consumed which may not be equal to the total bytes in a record because consumed bytes also include
bytes for the record delimiter which may not be a part of the actual record.\n\n\nOnce the reader creates an input stream for the block (or uses the previous opened stream if the current block is successor of the previous block) it initializes the reader context by invoking \nreaderContext.initialize(stream, blockMetadata, consecutiveBlock);\n. Initialize method is where any implementation of \nReaderContext\n can perform all the operations which have to be executed just before reading the block or create states which are used during the lifetime of reading the block.\n\n\nOnce the initialization is done, \nreaderContext.next()\n is called repeatedly until it returns \nnull\n. It is left to the \nReaderContext\n implementations to decide when a block is completely processed. In cases when a record is split across adjacent blocks, reader context may decide to read ahead of the current block boundary to completely fetch the split record (examples- \nLineReaderContext\n and \nReadAhea
dLineReaderContext\n). In other cases when there isn't a possibility of split record (example- \nFixedBytesReaderContext\n), it returns \nnull\n immediately when the block boundary is reached. The return type of \nreaderContext.next()\n is of type \ncom.datatorrent.lib.io.block.ReaderContext.Entity\n which is just a wrapper for a \nbyte[]\n that represents the record and total bytes used in fetching the record.\n\n\nAbstract methods\n\n\n\n\n\n\nSTREAM setupStream(B block)\n: creating a stream for a block is dependent on the type of source which is not known to AbstractBlockReader. Sub-classes which deal with a specific data source provide this implementation.\n\n\n\n\n\n\nR convertToRecord(byte[] bytes)\n: this converts the array of bytes into the actual instance of record type.\n\n\n\n\n\n\nAuto-scalability\n\n\nBlock reader can auto-scale, that is, depending on the backlog (total number of all the blocks which are waiting in the \nblocksMetadataInput\n port queue of all partition
s) it can create more partitions or reduce them. Details are discussed in the last section which covers the \npartitioner and stats-listener\n.\n\n\nConfiguration\n\n\n\n\nmaxReaders\n: when auto-scaling is enabled, this controls the maximum number of block reader partitions that can be created.\n\n\nminReaders\n: when auto-scaling is enabled, this controls the minimum number of block reader partitions that should always exist.\n\n\ncollectStats\n: this enables or disables auto-scaling. When it is set to \ntrue\n the stats (number of blocks in the queue) are collected and this triggers partitioning; otherwise auto-scaling is disabled.\n\n\nintervalMillis\n: when auto-scaling is enabled, this specifies the interval at which the reader will trigger the logic of computing the backlog and auto-scale.\n\n\n\n\n AbstractFSBlockReader\n\n\nThis abstract implementation deals with files. Different types of file systems that are implementations of \norg.apache.hadoop.fs.FileSystem\n are suppo
rted. The user can override \ngetFSInstance()\n method to create an instance of a specific \nFileSystem\n. By default, filesystem instance is created from the filesytem URI that comes from the default hadoop configuration.\n\n\nprotected FileSystem getFSInstance() throws IOException\n{\n return FileSystem.newInstance(configuration);\n}\n\n\n\n\nIt uses this filesystem instance to setup a stream of type \norg.apache.hadoop.fs.FSDataInputStream\n to read the block.\n\n\n@Override\nprotected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata block) throws IOException\n{\n return fs.open(new Path(block.getFilePath()));\n}\n\n\n\n\nAll the ports and configurations are derived from the super class. It doesn't provide an implementation of \nconvertToRecord(byte[] bytes)\n method which is delegated to concrete sub-classes.\n\n\nExample Application\n\n\nThis simple dag demonstrates how any concrete implementation of \nAbstractFSBlockReader\n can be plugged into an application. \
n\n\n\n\nIn the above application, file splitter creates block metadata for files which are sent to block reader. Partitions of the block reader parses the file blocks for records which are filtered, transformed and then persisted to a file (created per block). Therefore block reader is parallel partitioned with the 2 downstream operators - filter/converter and record output operator. The code which implements this dag is below.\n\n\npublic class ExampleApplication implements StreamingApplication\n{\n @Override\n public void populateDAG(DAG dag, Configuration configuration)\n {\n FileSplitterInput input = dag.addOperator(\nFile-splitter\n, new FileSplitterInput());\n //any concrete implementation of AbstractFSBlockReader based on the use-case can be added here.\n LineReader blockReader = dag.addOperator(\nBlock-reader\n, new LineReader());\n Filter filter = dag.addOperator(\nFilter\n, new Filter());\n RecordOutputOperator recordOutputOperator = dag.addOperator(\nRe
cord-writer\n, new RecordOutputOperator());\n\n dag.addStream(\nfile-block metadata\n, input.blocksMetadataOutput, blockReader.blocksMetadataInput);\n dag.addStream(\nrecords\n, blockReader.messages, filter.input);\n dag.addStream(\nfiltered-records\n, filter.output, recordOutputOperator.input);\n }\n\n /**\n * Concrete implementation of {@link AbstractFSBlockReader} for which a record is a line in the file.\n */\n public static class LineReader extends AbstractFSBlockReader.AbstractFSReadAheadLineReader\nString\n\n {\n\n @Override\n protected String convertToRecord(byte[] bytes)\n {\n return new String(bytes);\n }\n }\n\n /**\n * Considers any line starting with a '.' as invalid. Emits the valid records.\n */\n public static class Filter extends BaseOperator\n {\n public final transient DefaultOutputPort\nAbstractBlockReader.ReaderRecord\nString\n output = new DefaultOutputPort\n();\n public final transient DefaultInputPort\nAbstractB
lockReader.ReaderRecord\nString\n input = new DefaultInputPort\nAbstractBlockReader.ReaderRecord\nString\n()\n {\n @Override\n public void process(AbstractBlockReader.ReaderRecord\nString\n stringRecord)\n {\n //filter records and transform\n //if the string starts with a '.' ignore the string.\n if (!StringUtils.startsWith(stringRecord.getRecord(), \n.\n)) {\n output.emit(stringRecord);\n }\n }\n };\n }\n\n /**\n * Persists the valid records to corresponding block files.\n */\n public static class RecordOutputOperator extends AbstractFileOutputOperator\nAbstractBlockReader.ReaderRecord\nString\n\n {\n @Override\n protected String getFileName(AbstractBlockReader.ReaderRecord\nString\n tuple)\n {\n return Long.toHexString(tuple.getBlockId());\n }\n\n @Override\n protected byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord\nString\n tuple)\n {\n return tuple.getRecord().getBytes
();\n }\n }\n}\n\n\n\n\nConfiguration to parallel partition block reader with its downstream operators.\n\n\n \nproperty\n\n \nname\ndt.operator.Filter.port.input.attr.PARTITION_PARALLEL\n/name\n\n \nvalue\ntrue\n/value\n\n \n/property\n\n \nproperty\n\n \nname\ndt.operator.Record-writer.port.input.attr.PARTITION_PARALLEL\n/name\n\n \nvalue\ntrue\n/value\n\n \n/property\n\n\n\n\n\nAbstractFSReadAheadLineReader\n\n\nThis extension of \nAbstractFSBlockReader\n parses lines from a block and binds the \nreaderContext\n field to an instance of \nReaderContext.ReadAheadLineReaderContext\n.\n\n\nIt is abstract because it doesn't provide an implementation of \nconvertToRecord(byte[] bytes)\n since the user may want to convert the bytes that make a line into some other type. \n\n\nReadAheadLineReaderContext\n\n\nIn order to handle a line split across adjacent blocks, ReadAheadLineReaderContext always reads beyond the block boundary and ignores the bytes till the first end
-of-line character of all the blocks except the first block of the file. This ensures that no line is missed or incomplete.\n\n\nThis is one of the most common ways of handling a split record. It doesn't require any further information to decide if a line is complete. However, the cost of this consistent way to handle a line split is that it always reads from the next block.\n\n\nAbstractFSLineReader\n\n\nSimilar to \nAbstractFSReadAheadLineReader\n, even this parses lines from a block. However, it binds the \nreaderContext\n field to an instance of \nReaderContext.LineReaderContext\n.\n\n\nLineReaderContext\n\n\nThis handles the line split differently from \nReadAheadLineReaderContext\n. It doesn't always read from the next block. If the end of the last line is aligned with the block boundary then it stops processing the block. It does read from the next block when the boundaries are not aligned, that is, last line extends beyond the block boundary. The result of this is an inconsi
stency in reading the next block.\n\n\nWhen the boundary of the last line of the previous block was aligned with its block, then the first line of the current block is a valid line. However, in the other case the bytes from the block start offset to the first end-of-line character should be ignored. Therefore, this means that any record formed by this reader context has to be validated. For example, if the lines are of fixed size then size of each record can be validated or if each line begins with a special field then that knowledge can be used to check if a record is complete.\n\n\nIf the validations of completeness fails for a line then \nconvertToRecord(byte[] bytes)\n should return null.\n\n\nFSSliceReader\n\n\nA concrete extension of \nAbstractFSBlockReader\n that reads fixed-size \nbyte[]\n from a block and emits the byte array wrapped in \ncom.datatorrent.netlet.util.Slice\n.\n\n\nThis operator binds the \nreaderContext\n to an instance of \nReaderContext.FixedBytesReaderCon
text\n.\n\n\nFixedBytesReaderContext\n\n\nThis implementation of \nReaderContext\n never reads beyond a block boundary which can result in the last \nbyte[]\n of a block to be of a shorter length than the rest of the records.\n\n\nConfiguration\n\n\nreaderContext.length\n: length of each record. By default, this is initialized to the default hdfs block size.\n\n\nPartitioner and StatsListener\n\n\nThe logical instance of the block reader acts as the Partitioner (unless a custom partitioner is set using the operator attribute - \nPARTITIONER\n) as well as a StatsListener. This is because the \n\nAbstractBlockReader\n implements both the \ncom.datatorrent.api.Partitioner\n and \ncom.datatorrent.api.StatsListener\n interfaces and provides an implementation of \ndefinePartitions(...)\n and \nprocessStats(...)\n which make it auto-scalable.\n\n\nprocessStats \n\n\nThe application master invokes \nResponse processStats(BatchedOperatorStats stats)\n method on the logical instance with the
stats (\ntuplesProcessedPSMA\n, \ntuplesEmittedPSMA\n, \nlatencyMA\n, etc.) of each partition. The data which this operator is interested in is the \nqueueSize\n of the input port \nblocksMetadataInput\n.\n\n\nUsually the \nqueueSize\n of an input port gives the count of waiting control tuples plus data tuples. However, if a stats listener is interested only in the count of data tuples then that can be expressed by annotating the class with \n@DataQueueSize\n. In this case \nAbstractBlockReader\n itself is the \nStatsListener\n which is why it is annotated with \n@DataQueueSize\n.\n\n\nThe logical instance caches the queue size per partition and at regular intervals (configured by \nintervalMillis\n) sums these values to find the total backlog which is then used to decide whether re-partitioning is needed. The flow-diagram below describes this logic.\n\n\n\n\nThe goal of this logic is to create as many partitions within bounds (see \nmaxReaders\n and \nminReaders\n above) to quickly
reduce this backlog or if the backlog is small then remove any idle partitions.\n\n\ndefinePartitions\n\n\nBased on the \nrepartitionRequired\n field of the \nResponse\n object which is returned by \nprocessStats\n method, the application master invokes \n\n\nCollection\nPartition\nAbstractBlockReader\n...\n definePartitions(Collection\nPartition\nAbstractBlockReader\n...\n partitions, PartitioningContext context)\n\n\n\n\non the logical instance which is also the partitioner instance. The implementation calculates the difference between required partitions and the existing count of partitions. If this difference is negative, then equivalent number of partitions are removed otherwise new partitions are created. \n\n\nPlease note auto-scaling can be disabled by setting \ncollectStats\n to \nfalse\n. If the use-case requires only static partitioning, then that can be achieved by setting \nStatelessPartitioner\n as the operator attribute- \nPARTITIONER\n on the block reader.",
+ "title": "Block Reader"
+ },
+ {
+ "location": "/operators/block_reader/#block-reader",
+ "text": "This is a scalable operator that reads and parses blocks of data sources into records. A data source can be a file or a message bus that contains records and a block defines a chunk of data in the source by specifying the block offset and the length of the source belonging to the block.",
+ "title": "Block Reader"
+ },
+ {
+ "location": "/operators/block_reader/#why-is-it-needed",
+ "text": "A Block Reader is needed to parallelize reading and parsing of a single data source, for example a file. Simple parallelism of reading data sources can be achieved by multiple partitions reading different source of same type (for files see AbstractFileInputOperator ) but Block Reader partitions can read blocks of same source in parallel and parse them for records ensuring that no record is duplicated or missed.",
+ "title": "Why is it needed?"
+ },
+ {
+ "location": "/operators/block_reader/#class-diagram",
+ "text": "",
+ "title": "Class Diagram"
+ },
+ {
+ "location": "/operators/block_reader/#abstractblockreader",
+ "text": "This is the abstract implementation that serves as the base for different types of data sources. It defines how a block metadata is processed. The flow diagram below describes the processing of a block metadata.",
+ "title": "AbstractBlockReader"
+ },
+ {
+ "location": "/operators/block_reader/#ports",
+ "text": "blocksMetadataInput: input port on which block metadata are received. blocksMetadataOutput: output port on which block metadata are emitted if the port is connected. This port is useful when a downstream operator that receives records from block reader may also be interested to know the details of the corresponding blocks. messages: output port on which tuples of type com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord are emitted. This class encapsulates a record and the blockId of the corresponding block.",
+ "title": "Ports"
+ },
+ {
+ "location": "/operators/block_reader/#readercontext",
+ "text": "This is one of the most important fields in the block reader. It is of type com.datatorrent.lib.io.block.ReaderContext and is responsible for fetching bytes that make a record. It also lets the reader know how many total bytes were consumed which may not be equal to the total bytes in a record because consumed bytes also include bytes for the record delimiter which may not be a part of the actual record. Once the reader creates an input stream for the block (or uses the previous opened stream if the current block is successor of the previous block) it initializes the reader context by invoking readerContext.initialize(stream, blockMetadata, consecutiveBlock); . Initialize method is where any implementation of ReaderContext can perform all the operations which have to be executed just before reading the block or create states which are used during the lifetime of reading the block. Once the initialization is done, readerContext.next() is called repeatedl
y until it returns null . It is left to the ReaderContext implementations to decide when a block is completely processed. In cases when a record is split across adjacent blocks, reader context may decide to read ahead of the current block boundary to completely fetch the split record (examples- LineReaderContext and ReadAheadLineReaderContext ). In other cases when there isn't a possibility of split record (example- FixedBytesReaderContext ), it returns null immediately when the block boundary is reached. The return type of readerContext.next() is of type com.datatorrent.lib.io.block.ReaderContext.Entity which is just a wrapper for a byte[] that represents the record and total bytes used in fetching the record.",
+ "title": "readerContext"
+ },
+ {
+ "location": "/operators/block_reader/#abstract-methods",
+ "text": "STREAM setupStream(B block) : creating a stream for a block is dependent on the type of source which is not known to AbstractBlockReader. Sub-classes which deal with a specific data source provide this implementation. R convertToRecord(byte[] bytes) : this converts the array of bytes into the actual instance of record type.",
+ "title": "Abstract methods"
+ },
+ {
+ "location": "/operators/block_reader/#auto-scalability",
+ "text": "Block reader can auto-scale, that is, depending on the backlog (total number of all the blocks which are waiting in the blocksMetadataInput port queue of all partitions) it can create more partitions or reduce them. Details are discussed in the last section which covers the partitioner and stats-listener .",
+ "title": "Auto-scalability"
+ },
+ {
+ "location": "/operators/block_reader/#configuration",
+ "text": "maxReaders : when auto-scaling is enabled, this controls the maximum number of block reader partitions that can be created. minReaders : when auto-scaling is enabled, this controls the minimum number of block reader partitions that should always exist. collectStats : this enables or disables auto-scaling. When it is set to true the stats (number of blocks in the queue) are collected and this triggers partitioning; otherwise auto-scaling is disabled. intervalMillis : when auto-scaling is enabled, this specifies the interval at which the reader will trigger the logic of computing the backlog and auto-scale.",
+ "title": "Configuration"
+ },
+ {
+ "location": "/operators/block_reader/#example-application",
+ "text": "This simple dag demonstrates how any concrete implementation of AbstractFSBlockReader can be plugged into an application. In the above application, file splitter creates block metadata for files which are sent to block reader. Partitions of the block reader parses the file blocks for records which are filtered, transformed and then persisted to a file (created per block). Therefore block reader is parallel partitioned with the 2 downstream operators - filter/converter and record output operator. The code which implements this dag is below. public class ExampleApplication implements StreamingApplication\n{\n @Override\n public void populateDAG(DAG dag, Configuration configuration)\n {\n FileSplitterInput input = dag.addOperator( File-splitter , new FileSplitterInput());\n //any concrete implementation of AbstractFSBlockReader based on the use-case can be added here.\n LineReader blockReader = dag.addOperator( Block-reader , new LineReader());\n
Filter filter = dag.addOperator( Filter , new Filter());\n RecordOutputOperator recordOutputOperator = dag.addOperator( Record-writer , new RecordOutputOperator());\n\n dag.addStream( file-block metadata , input.blocksMetadataOutput, blockReader.blocksMetadataInput);\n dag.addStream( records , blockReader.messages, filter.input);\n dag.addStream( filtered-records , filter.output, recordOutputOperator.input);\n }\n\n /**\n * Concrete implementation of {@link AbstractFSBlockReader} for which a record is a line in the file.\n */\n public static class LineReader extends AbstractFSBlockReader.AbstractFSReadAheadLineReader String \n {\n\n @Override\n protected String convertToRecord(byte[] bytes)\n {\n return new String(bytes);\n }\n }\n\n /**\n * Considers any line starting with a '.' as invalid. Emits the valid records.\n */\n public static class Filter extends BaseOperator\n {\n public final transient DefaultOutputPort AbstractBlockRea
der.ReaderRecord String output = new DefaultOutputPort ();\n public final transient DefaultInputPort AbstractBlockReader.ReaderRecord String input = new DefaultInputPort AbstractBlockReader.ReaderRecord String ()\n {\n @Override\n public void process(AbstractBlockReader.ReaderRecord String stringRecord)\n {\n //filter records and transform\n //if the string starts with a '.' ignore the string.\n if (!StringUtils.startsWith(stringRecord.getRecord(), . )) {\n output.emit(stringRecord);\n }\n }\n };\n }\n\n /**\n * Persists the valid records to corresponding block files.\n */\n public static class RecordOutputOperator extends AbstractFileOutputOperator AbstractBlockReader.ReaderRecord String \n {\n @Override\n protected String getFileName(AbstractBlockReader.ReaderRecord String tuple)\n {\n return Long.toHexString(tuple.getBlockId());\n }\n\n @Override\n protected byte[] getBytesForTup
le(AbstractBlockReader.ReaderRecord String tuple)\n {\n return tuple.getRecord().getBytes();\n }\n }\n} Configuration to parallel partition block reader with its downstream operators. property \n name dt.operator.Filter.port.input.attr.PARTITION_PARALLEL /name \n value true /value \n /property \n property \n name dt.operator.Record-writer.port.input.attr.PARTITION_PARALLEL /name \n value true /value \n /property",
+ "title": "Example Application"
+ },
+ {
+ "location": "/operators/block_reader/#abstractfsreadaheadlinereader",
+ "text": "This extension of AbstractFSBlockReader parses lines from a block and binds the readerContext field to an instance of ReaderContext.ReadAheadLineReaderContext . It is abstract because it doesn't provide an implementation of convertToRecord(byte[] bytes) since the user may want to convert the bytes that make a line into some other type.",
+ "title": "AbstractFSReadAheadLineReader"
+ },
+ {
+ "location": "/operators/block_reader/#readaheadlinereadercontext",
+ "text": "In order to handle a line split across adjacent blocks, ReadAheadLineReaderContext always reads beyond the block boundary and ignores the bytes till the first end-of-line character of all the blocks except the first block of the file. This ensures that no line is missed or incomplete. This is one of the most common ways of handling a split record. It doesn't require any further information to decide if a line is complete. However, the cost of this consistent way to handle a line split is that it always reads from the next block.",
+ "title": "ReadAheadLineReaderContext"
+ },
+ {
+ "location": "/operators/block_reader/#abstractfslinereader",
+ "text": "Similar to AbstractFSReadAheadLineReader , even this parses lines from a block. However, it binds the readerContext field to an instance of ReaderContext.LineReaderContext .",
+ "title": "AbstractFSLineReader"
+ },
+ {
+ "location": "/operators/block_reader/#linereadercontext",
+ "text": "This handles the line split differently from ReadAheadLineReaderContext . It doesn't always read from the next block. If the end of the last line is aligned with the block boundary then it stops processing the block. It does read from the next block when the boundaries are not aligned, that is, last line extends beyond the block boundary. The result of this is an inconsistency in reading the next block. When the boundary of the last line of the previous block was aligned with its block, then the first line of the current block is a valid line. However, in the other case the bytes from the block start offset to the first end-of-line character should be ignored. Therefore, this means that any record formed by this reader context has to be validated. For example, if the lines are of fixed size then size of each record can be validated or if each line begins with a special field then that knowledge can be used to check if a record is complete. If the validations
of completeness fails for a line then convertToRecord(byte[] bytes) should return null.",
+ "title": "LineReaderContext"
+ },
+ {
+ "location": "/operators/block_reader/#fsslicereader",
+ "text": "A concrete extension of AbstractFSBlockReader that reads fixed-size byte[] from a block and emits the byte array wrapped in com.datatorrent.netlet.util.Slice . This operator binds the readerContext to an instance of ReaderContext.FixedBytesReaderContext .",
+ "title": "FSSliceReader"
+ },
+ {
+ "location": "/operators/block_reader/#fixedbytesreadercontext",
+ "text": "This implementation of ReaderContext never reads beyond a block boundary which can result in the last byte[] of a block to be of a shorter length than the rest of the records.",
+ "title": "FixedBytesReaderContext"
+ },
+ {
+ "location": "/operators/block_reader/#configuration_1",
+ "text": "readerContext.length : length of each record. By default, this is initialized to the default hdfs block size.",
+ "title": "Configuration"
+ },
+ {
+ "location": "/operators/block_reader/#partitioner-and-statslistener",
+ "text": "The logical instance of the block reader acts as the Partitioner (unless a custom partitioner is set using the operator attribute - PARTITIONER ) as well as a StatsListener. This is because the AbstractBlockReader implements both the com.datatorrent.api.Partitioner and com.datatorrent.api.StatsListener interfaces and provides an implementation of definePartitions(...) and processStats(...) which make it auto-scalable.",
+ "title": "Partitioner and StatsListener"
+ },
+ {
+ "location": "/operators/block_reader/#processstats",
+ "text": "The application master invokes Response processStats(BatchedOperatorStats stats) method on the logical instance with the stats ( tuplesProcessedPSMA , tuplesEmittedPSMA , latencyMA , etc.) of each partition. The data which this operator is interested in is the queueSize of the input port blocksMetadataInput . Usually the queueSize of an input port gives the count of waiting control tuples plus data tuples. However, if a stats listener is interested only in the count of data tuples then that can be expressed by annotating the class with @DataQueueSize . In this case AbstractBlockReader itself is the StatsListener which is why it is annotated with @DataQueueSize . The logical instance caches the queue size per partition and at regular intervals (configured by intervalMillis ) sums these values to find the total backlog which is then used to decide whether re-partitioning is needed. The flow-diagram below describes this logic. The goal of this l
ogic is to create as many partitions within bounds (see maxReaders and minReaders above) to quickly reduce this backlog or if the backlog is small then remove any idle partitions.",
+ "title": "processStats "
+ },
+ {
+ "location": "/operators/block_reader/#definepartitions",
+ "text": "Based on the repartitionRequired field of the Response object which is returned by processStats method, the application master invokes Collection Partition AbstractBlockReader ... definePartitions(Collection Partition AbstractBlockReader ... partitions, PartitioningContext context) on the logical instance which is also the partitioner instance. The implementation calculates the difference between required partitions and the existing count of partitions. If this difference is negative, then equivalent number of partitions are removed otherwise new partitions are created. Please note auto-scaling can be disabled by setting collectStats to false . If the use-case requires only static partitioning, then that can be achieved by setting StatelessPartitioner as the operator attribute- PARTITIONER on the block reader.",
+ "title": "definePartitions"
+ },
+ {
+ "location": "/operators/csvformatter/",
+ "text": "CsvFormatter\n\n\nOperator Objective\n\n\nThis operator receives a POJO (\nPlain Old Java Object\n) as an incoming tuple, converts the data in \nthe incoming POJO to a custom delimited string and emits the delimited string.\n\n\nCsvFormatter supports schema definition as a JSON string. \n\n\nCsvFormatter does not hold any state and is \nidempotent\n, \nfault-tolerant\n and \nstatically/dynamically partitionable\n.\n\n\nOperator Information\n\n\n\n\nOperator location: \nmalhar-contrib\n\n\nAvailable since: \n3.2.0\n\n\nOperator state: \nEvolving\n\n\nJava Packages:\n\n\nOperator: \ncom.datatorrent.contrib.formatter.CsvFormatter\n\n\n\n\n\n\n\n\nProperties, Attributes and Ports\n\n\nProperties of POJOEnricher\n\n\n\n\n\n\n\n\nProperty\n\n\nDescription\n\n\nType\n\n\nMandatory\n\n\nDefault Value\n\n\n\n\n\n\n\n\n\n\nschema\n\n\nContents of the schema.Schema is specified in a json format.\n\n\nString\n\n\nYes\n\n\nN/A\n\n\n\n\n\n\n\n\nPlatform Attributes that influe
nces operator behavior\n\n\n\n\n\n\n\n\nAttribute\n\n\nDescription\n\n\nType\n\n\nMandatory\n\n\n\n\n\n\n\n\n\n\nin.TUPLE_CLASS\n\n\nTUPLE_CLASS attribute on input port which tells operator the class of POJO which will be incoming\n\n\nClass or FQCN\n\n\nYes\n\n\n\n\n\n\n\n\nPorts\n\n\n\n\n\n\n\n\nPort\n\n\nDescription\n\n\nType\n\n\nMandatory\n\n\n\n\n\n\n\n\n\n\nin\n\n\nTuples which need to be formatted are received on this port\n\n\nObject (POJO)\n\n\nYes\n\n\n\n\n\n\nout\n\n\nTuples that are formatted are emitted from this port\n\n\nString\n\n\nNo\n\n\n\n\n\n\nerr\n\n\nTuples that could not be converted are emitted on this port\n\n\nObject\n\n\nNo\n\n\n\n\n\n\n\n\nLimitations\n\n\nCurrent CsvFormatter contain following limitations:\n\n\n\n\nThe field names in schema and the pojo field names should match.For eg. if name of the schema field is \"customerName\", then POJO should contain a field with the same name. \n\n\nField wise validation/formatting is not yet supported.\n\n\nTh
e fields will be written to the file in the same order as specified in schema.json\n\n\n\n\nExample\n\n\nExample for CsvFormatter can be found at: \nhttps://github.com/DataTorrent/examples/tree/master/tutorials/csvformatter\n\n\nAdvanced\n\n\n Schema format for CsvFormatter\n\n\nCsvFormatter expects schema to be a String in JSON format:\n\n\nExample for format of schema:\n\n\n{\n \nseparator\n: \n,\n,\n \nquoteChar\n: \n\\\n,\n \nlineDelimiter\n: \n\\n\n,\n \nfields\n: [\n {\n \nname\n: \ncampaignId\n,\n \ntype\n: \nInteger\n\n },\n {\n \nname\n: \nstartDate\n,\n \ntype\n: \nDate\n,\n \nconstraints\n: {\n \nformat\n: \nyyyy-MM-dd\n\n }\n }\n ]\n}\n\n\n\n\nPartitioning of CsvFormatter\n\n\nBeing stateless operator, CsvFormatter will ensure built-in partitioners present in Malhar library can be directly used by setting properties as follows:\n\n\nStateless partioning of CsvFormatter\n\n\nStateless partitioning will ensure that Cs
vFormatter will be partitioned right at the start of the application and will remain partitioned throughout the lifetime of the DAG.\nCsvFormatter can be stateless partitioned by adding following lines to properties.xml:\n\n\n \nproperty\n\n \nname\ndt.operator.{OperatorName}.attr.PARTITIONER\n/name\n\n \nvalue\ncom.datatorrent.common.partitioner.StatelessPartitioner:2\n/value\n\n \n/property\n\n\n\n\n\nwhere {OperatorName} is the name of the CsvFormatter operator.\nAbove lines will partition CsvFormatter statically 2 times. Above value can be changed accordingly to change the number of static partitions.\n\n\nDynamic Partitioning of CsvFormatter\n\n\nDynamic partitioning is a feature of Apex platform which changes the partition of the operator based on certain conditions.\nCsvFormatter can be dynamically partitioned using below out-of-the-box partitioner:\n\n\nThroughput based\n\n\nFollowing code can be added to populateDAG method of application to dynamically partition Csv
Formatter:\n\n\n StatelessThroughputBasedPartitioner\nCsvFormatter\n partitioner = new StatelessThroughputBasedPartitioner\n();\n partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));\n partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));\n partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));\n dag.setAttribute(csvFormatter, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));\n dag.setAttribute(csvFormatter, OperatorContext.PARTITIONER, partitioner);\n\n\n\n\nAbove code will dynamically partition CsvFormatter when throughput changes.\nIf overall throughput of CsvFormatter goes beyond 30000 or less than 10000, the platform will repartition CsvFormatter \nto balance throughput of a single partition to be between 10000 and 30000.\nCooldownMillis of 10000 will be used as threshold time for which throughput change is observed.",
+ "title": "CSV Formatter"
+ },
+ {
+ "location": "/operators/csvformatter/#csvformatter",
+ "text": "",
+ "title": "CsvFormatter"
+ },
+ {
+ "location": "/operators/csvformatter/#operator-objective",
+ "text": "This operator receives a POJO ( Plain Old Java Object ) as an incoming tuple, converts the data in \nthe incoming POJO to a custom delimited string and emits the delimited string. CsvFormatter supports schema definition as a JSON string. CsvFormatter does not hold any state and is idempotent , fault-tolerant and statically/dynamically partitionable .",
+ "title": "Operator Objective"
+ },
+ {
+ "location": "/operators/csvformatter/#operator-information",
+ "text": "Operator location: malhar-contrib Available since: 3.2.0 Operator state: Evolving Java Packages: Operator: com.datatorrent.contrib.formatter.CsvFormatter",
+ "title": "Operator Information"
+ },
+ {
+ "location": "/operators/csvformatter/#properties-attributes-and-ports",
+ "text": "",
+ "title": "Properties, Attributes and Ports"
+ },
+ {
+ "location": "/operators/csvformatter/#platform-attributes-that-influences-operator-behavior",
+ "text": "Attribute Description Type Mandatory in.TUPLE_CLASS TUPLE_CLASS attribute on input port which tells operator the class of POJO which will be incoming Class or FQCN Yes",
+ "title": "Platform Attributes that influences operator behavior"
+ },
+ {
+ "location": "/operators/csvformatter/#ports",
+ "text": "Port Description Type Mandatory in Tuples which need to be formatted are received on this port Object (POJO) Yes out Tuples that are formatted are emitted from this port String No err Tuples that could not be converted are emitted on this port Object No",
+ "title": "Ports"
+ },
+ {
+ "location": "/operators/csvformatter/#limitations",
+ "text": "Current CsvFormatter contain following limitations: The field names in schema and the pojo field names should match.For eg. if name of the schema field is \"customerName\", then POJO should contain a field with the same name. Field wise validation/formatting is not yet supported. The fields will be written to the file in the same order as specified in schema.json",
+ "title": "Limitations"
+ },
+ {
+ "location": "/operators/csvformatter/#example",
+ "text": "Example for CsvFormatter can be found at: https://github.com/DataTorrent/examples/tree/master/tutorials/csvformatter",
+ "title": "Example"
+ },
+ {
+ "location": "/operators/csvformatter/#advanced",
+ "text": "",
+ "title": "Advanced"
+ },
+ {
+ "location": "/operators/csvformatter/#partitioning-of-csvformatter",
+ "text": "Being stateless operator, CsvFormatter will ensure built-in partitioners present in Malhar library can be directly used by setting properties as follows:",
+ "title": "Partitioning of CsvFormatter"
+ },
+ {
+ "location": "/operators/csvformatter/#stateless-partioning-of-csvformatter",
+ "text": "Stateless partitioning will ensure that CsvFormatter will be partitioned right at the start of the application and will remain partitioned throughout the lifetime of the DAG.\nCsvFormatter can be stateless partitioned by adding following lines to properties.xml: property \n name dt.operator.{OperatorName}.attr.PARTITIONER /name \n value com.datatorrent.common.partitioner.StatelessPartitioner:2 /value \n /property where {OperatorName} is the name of the CsvFormatter operator.\nAbove lines will partition CsvFormatter statically 2 times. Above value can be changed accordingly to change the number of static partitions.",
+ "title": "Stateless partioning of CsvFormatter"
+ },
+ {
+ "location": "/operators/csvformatter/#dynamic-partitioning-of-csvformatter",
+ "text": "Dynamic partitioning is a feature of Apex platform which changes the partition of the operator based on certain conditions.\nCsvFormatter can be dynamically partitioned using below out-of-the-box partitioner:",
+ "title": "Dynamic Partitioning of CsvFormatter"
+ },
+ {
+ "location": "/operators/csvformatter/#throughput-based",
+ "text": "Following code can be added to populateDAG method of application to dynamically partition CsvFormatter: StatelessThroughputBasedPartitioner CsvFormatter partitioner = new StatelessThroughputBasedPartitioner ();\n partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));\n partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));\n partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));\n dag.setAttribute(csvFormatter, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));\n dag.setAttribute(csvFormatter, OperatorContext.PARTITIONER, partitioner); Above code will dynamically partition CsvFormatter when throughput changes.\nIf overall throughput of CsvFormatter goes beyond 30000 or less than 10000, the platform will repartition CsvFormatter \nto balance throughput of a single partition to be between 10000 and 30000.\nCooldownMillis of 10000 will be used as threshold time for which t
hroughput change is observed.",
+ "title": "Throughput based"
+ },
+ {
+ "location": "/operators/csvParserOperator/",
+ "text": "Csv Parser Operator\n\n\nOperator Objective\n\n\nThis operator is designed to parse delimited records and construct a map or concrete java class also known as \n\"POJO\"\n out of it. User need to provide the schema to describe the delimited data. Based on schema definition the operator will parse the incoming record to object map and POJO. User can also provide constraints if any, in the schema. The supported constraints are listed in \nconstraints table\n. The incoming record will be validated against those constraints. Valid records will be emitted as POJO / map while invalid ones are emitted on error port with error message.\n\n\nNote\n: field names of POJO must match field names in schema and in the same order as it appears in the incoming data.\n\n\nOverview\n\n\nThe operator is \nidempotent\n, \nfault-tolerant\n and \npartitionable\n.\n\n\nClass Diagram\n\n\n\n\nOperator Information\n\n\n\n\nOperator location:\nmalhar-contrib\n\n\nAvailable since:\n3.2.0\
n\n\nOperator state:\nEvolving\n\n\nJava Package:\ncom.datatorrent.contrib.parser.CsvParser\n\n\
<TRUNCATED>