You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/11/27 01:15:20 UTC

[06/15] apex-site git commit: Adding malhar- documentation

http://git-wip-us.apache.org/repos/asf/apex-site/blob/357a5a07/docs/malhar-3.6/mkdocs/search_index.json
----------------------------------------------------------------------
diff --git a/docs/malhar-3.6/mkdocs/search_index.json b/docs/malhar-3.6/mkdocs/search_index.json
new file mode 100644
index 0000000..9248536
--- /dev/null
+++ b/docs/malhar-3.6/mkdocs/search_index.json
@@ -0,0 +1,1429 @@
+{
+    "docs": [
+        {
+            "location": "/", 
+            "text": "Apache Apex Malhar\n\n\nApache Apex Malhar is an open source operator and codec library that can be used with the \nApache Apex\n platform to build real-time streaming applications.  Enabling users to extract value quickly, Malhar operators help get data in, analyze it in real-time, and get data out of Hadoop.  In addition to the operators, the library contains a number of demos applications, demonstrating operator features and capabilities.\n\n\n\n\nCapabilities common across Malhar operators\n\n\nFor most streaming platforms, connectors are afterthoughts and often end up being simple \u2018bolt-ons\u2019 to the platform. As a result they often cause performance issues or data loss when put through failure scenarios and scalability requirements. Malhar operators do not face these issues as they were designed to be integral parts of Apex. Hence, they have following core streaming runtime capabilities\n\n\n\n\nFault tolerance\n \u2013 Malhar operators where appli
 cable have fault tolerance built in. They use the checkpoint capability provided by the framework to ensure that there is no data loss under ANY failure scenario.\n\n\nProcessing guarantees\n \u2013 Malhar operators where applicable provide out of the box support for ALL three processing guarantees \u2013 exactly once, at-least once, and at-most once WITHOUT requiring the user to write any additional code.  Some operators, like MQTT operator, deal with source systems that can not track processed data and hence need the operators to keep track of the data.  Malhar has support for a generic operator that uses alternate storage like HDFS to facilitate this.  Finally for databases that support transactions or support any sort of atomic batch operations Malhar operators can do exactly once down to the tuple level.\n\n\nDynamic updates\n \u2013 Based on changing business conditions you often have to tweak several parameters used by the operators in your streaming application without incur
 ring any application downtime. You can also change properties of a Malhar operator at runtime without having to bring down the application.\n\n\nEase of extensibility\n \u2013 Malhar operators are based on templates that are easy to extend.\n\n\nPartitioning support\n \u2013 In streaming applications the input data stream often needs to be partitioned based on the contents of the stream. Also for operators that ingest data from external systems partitioning needs to be done based on the capabilities of the external system.  For example with Kafka, the operator can automatically scale up or down based on the changes in the number of Kafka partitions.\n\n\n\n\nOperator Library Overview\n\n\nInput/output connectors\n\n\nBelow is a summary of the various sub categories of input and output operators. Input operators also have a corresponding output operator\n\n\n\n\nFile Systems\n \u2013 Most streaming analytics use cases require the data to be stored in HDFS or perhaps S3 if the applica
 tion is running in AWS.  Users often need to re-run their streaming analytical applications against historical data or consume data from upstream processes that are perhaps writing to some NFS share.  Apex supports input \n output operators for HDFS, S3, NFS \n Local Files.  There are also File Splitter and Block Reader operators, which can accelecate processing of large files by splitting and paralellizing the work across non-overlapping sets of file blocks.\n\n\nRelational Databases\n \u2013 Most stream processing use cases require some reference data lookups to enrich, tag or filter streaming data. There is also a need to save results of the streaming analytical computation to a database so an operational dashboard can see them. Apex supports a JDBC operator so you can read/write data from any JDBC compliant RDBMS like Oracle, MySQL, Sqlite, etc.\n\n\nNoSQL Databases\n \u2013 NoSQL key-value pair databases like Cassandra \n HBase are a common part of streaming analytics applicati
 on architectures to lookup reference data or store results.  Malhar has operators for HBase, Cassandra, Accumulo, Aerospike, MongoDB, and CouchDB.\n\n\nMessaging Systems\n \u2013 Kafka, JMS, and similar systems are the workhorses of messaging infrastructure in most enterprises.  Malhar has a robust, industry-tested set of operators to read and write Kafka, JMS, ZeroMQ, and RabbitMQ messages.\n\n\nNotification Systems\n \u2013 Malhar includes an operator for sending notifications via SMTP.\n\n\nIn-memory Databases \n Caching platforms\n - Some streaming use cases need instantaneous access to shared state across the application. Caching platforms and in-memory databases serve this purpose really well. To support these use cases, Malhar has operators for memcached and Redis.\n\n\nSocial Media\n - Malhar includes an operator to connect to the popular Twitter stream fire hose.\n\n\nProtocols\n - Malhar provides connectors that can communicate in HTTP, RSS, Socket, WebSocket, FTP, and MQT
 T.\n\n\n\n\nParsers\n\n\nThere are many industry vertical specific data formats that a streaming application developer might need to parse. Often there are existing parsers available for these that can be directly plugged into an Apache Apex application. For example in the Telco space, a Java based CDR parser can be directly plugged into Apache Apex operator. To further simplify development experience, Malhar also provides some operators for parsing common formats like XML (DOM \n SAX), JSON (flat map converter), Apache log files, syslog, etc.\n\n\nStream manipulation\n\n\nStreaming data inevitably needs processing to clean, filter, tag, summarize, etc. The goal of Malhar is to enable the application developer to focus on WHAT needs to be done to the stream to get it in the right format and not worry about the HOW.  Malhar has several operators to perform the common stream manipulation actions like \u2013 GroupBy, Join, Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, Out
 er join, Select, Update etc.\n\n\nCompute\n\n\nOne of the most important promises of a streaming analytics platform like Apache Apex is the ability to do analytics in real-time. However delivering on the promise becomes really difficult when the platform does not provide out of the box operators to support variety of common compute functions as the user then has to worry about making these scalable, fault tolerant, stateful, etc.  Malhar takes this responsibility away from the application developer by providing a variety of out of the box computational operators.\n\n\nBelow is just a snapshot of the compute operators available in Malhar\n\n\n\n\nStatistics and math - Various mathematical and statistical computations over application defined time windows.\n\n\nFiltering and pattern matching\n\n\nSorting, maps, frequency, TopN, BottomN\n\n\nRandom data generators\n\n\n\n\nLanguages Support\n\n\nMigrating to a new platform often requires re-use of the existing code that would be diffic
 ult or time-consuming to re-write.  With this in mind, Malhar supports invocation of code written in other languages by wrapping them in one of the library operators, and allows execution of software written in:\n\n\n\n\nJavaScript\n\n\nPython\n\n\nR\n\n\nRuby", 
+            "title": "Apache Apex Malhar"
+        }, 
+        {
+            "location": "/#apache-apex-malhar", 
+            "text": "Apache Apex Malhar is an open source operator and codec library that can be used with the  Apache Apex  platform to build real-time streaming applications.  Enabling users to extract value quickly, Malhar operators help get data in, analyze it in real-time, and get data out of Hadoop.  In addition to the operators, the library contains a number of demos applications, demonstrating operator features and capabilities.", 
+            "title": "Apache Apex Malhar"
+        }, 
+        {
+            "location": "/#capabilities-common-across-malhar-operators", 
+            "text": "For most streaming platforms, connectors are afterthoughts and often end up being simple \u2018bolt-ons\u2019 to the platform. As a result they often cause performance issues or data loss when put through failure scenarios and scalability requirements. Malhar operators do not face these issues as they were designed to be integral parts of Apex. Hence, they have following core streaming runtime capabilities   Fault tolerance  \u2013 Malhar operators where applicable have fault tolerance built in. They use the checkpoint capability provided by the framework to ensure that there is no data loss under ANY failure scenario.  Processing guarantees  \u2013 Malhar operators where applicable provide out of the box support for ALL three processing guarantees \u2013 exactly once, at-least once, and at-most once WITHOUT requiring the user to write any additional code.  Some operators, like MQTT operator, deal with source systems that can not track processed data and hence n
 eed the operators to keep track of the data.  Malhar has support for a generic operator that uses alternate storage like HDFS to facilitate this.  Finally for databases that support transactions or support any sort of atomic batch operations Malhar operators can do exactly once down to the tuple level.  Dynamic updates  \u2013 Based on changing business conditions you often have to tweak several parameters used by the operators in your streaming application without incurring any application downtime. You can also change properties of a Malhar operator at runtime without having to bring down the application.  Ease of extensibility  \u2013 Malhar operators are based on templates that are easy to extend.  Partitioning support  \u2013 In streaming applications the input data stream often needs to be partitioned based on the contents of the stream. Also for operators that ingest data from external systems partitioning needs to be done based on the capabilities of the external system.  Fo
 r example with Kafka, the operator can automatically scale up or down based on the changes in the number of Kafka partitions.", 
+            "title": "Capabilities common across Malhar operators"
+        }, 
+        {
+            "location": "/#operator-library-overview", 
+            "text": "", 
+            "title": "Operator Library Overview"
+        }, 
+        {
+            "location": "/#inputoutput-connectors", 
+            "text": "Below is a summary of the various sub categories of input and output operators. Input operators also have a corresponding output operator   File Systems  \u2013 Most streaming analytics use cases require the data to be stored in HDFS or perhaps S3 if the application is running in AWS.  Users often need to re-run their streaming analytical applications against historical data or consume data from upstream processes that are perhaps writing to some NFS share.  Apex supports input   output operators for HDFS, S3, NFS   Local Files.  There are also File Splitter and Block Reader operators, which can accelecate processing of large files by splitting and paralellizing the work across non-overlapping sets of file blocks.  Relational Databases  \u2013 Most stream processing use cases require some reference data lookups to enrich, tag or filter streaming data. There is also a need to save results of the streaming analytical computation to a database so an operational das
 hboard can see them. Apex supports a JDBC operator so you can read/write data from any JDBC compliant RDBMS like Oracle, MySQL, Sqlite, etc.  NoSQL Databases  \u2013 NoSQL key-value pair databases like Cassandra   HBase are a common part of streaming analytics application architectures to lookup reference data or store results.  Malhar has operators for HBase, Cassandra, Accumulo, Aerospike, MongoDB, and CouchDB.  Messaging Systems  \u2013 Kafka, JMS, and similar systems are the workhorses of messaging infrastructure in most enterprises.  Malhar has a robust, industry-tested set of operators to read and write Kafka, JMS, ZeroMQ, and RabbitMQ messages.  Notification Systems  \u2013 Malhar includes an operator for sending notifications via SMTP.  In-memory Databases   Caching platforms  - Some streaming use cases need instantaneous access to shared state across the application. Caching platforms and in-memory databases serve this purpose really well. To support these use cases, Malhar
  has operators for memcached and Redis.  Social Media  - Malhar includes an operator to connect to the popular Twitter stream fire hose.  Protocols  - Malhar provides connectors that can communicate in HTTP, RSS, Socket, WebSocket, FTP, and MQTT.", 
+            "title": "Input/output connectors"
+        }, 
+        {
+            "location": "/#parsers", 
+            "text": "There are many industry vertical specific data formats that a streaming application developer might need to parse. Often there are existing parsers available for these that can be directly plugged into an Apache Apex application. For example in the Telco space, a Java based CDR parser can be directly plugged into Apache Apex operator. To further simplify development experience, Malhar also provides some operators for parsing common formats like XML (DOM   SAX), JSON (flat map converter), Apache log files, syslog, etc.", 
+            "title": "Parsers"
+        }, 
+        {
+            "location": "/#stream-manipulation", 
+            "text": "Streaming data inevitably needs processing to clean, filter, tag, summarize, etc. The goal of Malhar is to enable the application developer to focus on WHAT needs to be done to the stream to get it in the right format and not worry about the HOW.  Malhar has several operators to perform the common stream manipulation actions like \u2013 GroupBy, Join, Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, Outer join, Select, Update etc.", 
+            "title": "Stream manipulation"
+        }, 
+        {
+            "location": "/#compute", 
+            "text": "One of the most important promises of a streaming analytics platform like Apache Apex is the ability to do analytics in real-time. However delivering on the promise becomes really difficult when the platform does not provide out of the box operators to support variety of common compute functions as the user then has to worry about making these scalable, fault tolerant, stateful, etc.  Malhar takes this responsibility away from the application developer by providing a variety of out of the box computational operators.  Below is just a snapshot of the compute operators available in Malhar   Statistics and math - Various mathematical and statistical computations over application defined time windows.  Filtering and pattern matching  Sorting, maps, frequency, TopN, BottomN  Random data generators", 
+            "title": "Compute"
+        }, 
+        {
+            "location": "/#languages-support", 
+            "text": "Migrating to a new platform often requires re-use of the existing code that would be difficult or time-consuming to re-write.  With this in mind, Malhar supports invocation of code written in other languages by wrapping them in one of the library operators, and allows execution of software written in:   JavaScript  Python  R  Ruby", 
+            "title": "Languages Support"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/", 
+            "text": "KAFKA INPUT OPERATOR\n\n\nIntroduction\n\n\nApache Kafka\n is a pull-based and distributed publish subscribe messaging system,\ntopics are partitioned and replicated across nodes. \n\n\nThe Kafka input operator consumes data from the partitions of a Kafka topic for processing in Apex. \nThe operator has the ability to automatically scale with the Kafka partitioning for high throughput. \nIt is fault-tolerant (consumer offset checkpointing) and guarantees idempotency to allow exactly-once results in the downstream pipeline.\n\n\nFor more information about the operator design see this \npresentation\n\nand for processing guarantees this \nblog\n.\n\n\nThere are two separate implementations of the input operator,\none built against Kafka 0.8 client and a newer version for the\nKafka 0.9 consumer API that also works with MapR Streams.\nThese reside in different packages and are described separately below.\n\n\nKafka Input Operator for Kafka 0.8.x\n\n\nPackage: \ncom
 .datatorrent.contrib.kafka\n\n\nMaven artifact: \nmalhar-contrib\n\n\nAbstractKafkaInputOperator\n\n\nThis is the abstract implementation that serves as base class for consuming messages from Kafka messaging system. This class doesn\u2019t have any ports.\n\n\n\n\nConfiguration Parameters\n\n\n\n\n\n\n\n\n\n\n\n\nParameter\n\n\nDescription\n\n\n\n\n\n\nmaxTuplesPerWindow\n\n\nControls the maximum number of messages emitted in each streaming window from this operator. Minimum value is 1. Default value = MAX_VALUE \n\n\n\n\n\n\nidempotentStorageManager\n\n\nThis is an instance of IdempotentStorageManager. Idempotency ensures that the operator will process the same set of messages in a window before and after a failure. For example, let's say the operator completed window 10 and failed somewhere between window 11. If the operator gets restored at window 10 then it will process the same messages again in window 10 which it did in the previous run before the failure. Idempotency is impor
 tant but comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. Default Value = com.datatorrent.lib.io.IdempotentStorageManager.\nNoopIdempotentStorageManager\n\n\n\n\n\n\nstrategy\n\n\nOperator supports two types of partitioning strategies, ONE_TO_ONE and ONE_TO_MANY.\n\n\nONE_TO_ONE: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances.\n\n\nONE_TO_MANY: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partition
 s(N) = 2 then AppMaster creates 2 Kafka input operator instances.\nDefault Value = ONE_TO_ONE\n\n\n\n\n\n\nmsgRateUpperBound\n\n\nMaximum messages upper bound. Operator repartitions when the \nmsgProcessedPS\n exceeds this bound. \nmsgProcessedPS\n is the average number of messages processed per second by this operator.\n\n\n\n\n\n\nbyteRateUpperBound\n\n\nMaximum bytes upper bound. Operator repartitions when the \nbytesPS\n exceeds this bound. \nbytesPS\n is the average number of bytes processed per second by this operator.\n\n\n\n\n\n\n\n\noffsetManager\n\n\nThis is an optional parameter that is useful when the application restarts or start at specific offsets (offsets are explained below)\n\n\n\n\n\n\nrepartitionInterval\n\n\nInterval specified in milliseconds. This value specifies the minimum time required between two repartition actions. Default Value = 30 Seconds\n\n\n\n\n\n\nrepartitionCheckInterval\n\n\nInterval specified in milliseconds. This value specifies the minimum int
 erval between two offset updates. Default Value = 5 Seconds\n\n\n\n\n\n\ninitialPartitionCount\n\n\nWhen the ONE_TO_MANY partition strategy is enabled, this value indicates the number of Kafka input operator instances. Default Value = 1\n\n\n\n\n\n\nconsumer\n\n\nThis is an instance of com.datatorrent.contrib.kafka.KafkaConsumer. Default Value = Instance of SimpleKafkaConsumer.\n\n\n\n\n\n\n\n\nAbstract Methods\n\n\nvoid emitTuple(Message message)\n: Abstract method that emits tuples extracted from Kafka message.\n\n\nKafkaConsumer\n\n\nThis is an abstract implementation of Kafka consumer. It sends the fetch\nrequests to the leading brokers of Kafka partitions. For each request,\nit receives the set of messages and stores them into the buffer which is\nArrayBlockingQueue. SimpleKafkaConsumer\u00a0which extends\nKafkaConsumer and serves the functionality of Simple Consumer API and\nHighLevelKafkaConsumer which extends KafkaConsumer and \u00a0serves the\nfunctionality of High Level Co
 nsumer API.\n\n\nPre-requisites\n\n\nThis operator uses the Kafka 0.8.2.1 client consumer API\nand will work with 0.8.x and 0.7.x versions of Kafka broker.\n\n\nConfiguration Parameters\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nParameter\n\n\nType\n\n\nDefault\n\n\nDescription\n\n\n\n\n\n\nzookeeper\n\n\nString\n\n\n\n\nSpecifies the zookeeper quorum of Kafka clusters that you want to consume messages from. zookeeper \u00a0is a string in the form of hostname1:port1,hostname2:port2,hostname3:port3 \u00a0where hostname1,hostname2,hostname3 are hosts and port1,port2,port3 are ports of zookeeper server. \u00a0If the topic name is the same across the Kafka clusters and want to consume data from these clusters, then configure the zookeeper as follows: c1::hs1:p1,hs2:p2,hs3:p3;c2::hs4:p4,hs5:p5,c3::hs6:p6\n\n\nwhere\n\n\nc1,c2,c3 indicates the cluster names, hs1,hs2,hs3,hs4,hs5,hs6 are zookeeper hosts and p1,p2,p3,p4,p5,p6 are corresponding ports. Here, cluster name is optional in case of single cl
 uster\n\n\n\n\n\n\ncacheSize\n\n\nint\n\n\n1024\n\n\nMaximum of buffered messages hold in memory.\n\n\n\n\n\n\ntopic\n\n\nString\n\n\ndefault_topic\n\n\nIndicates the name of the topic.\n\n\n\n\n\n\ninitialOffset\n\n\nString\n\n\nlatest\n\n\nIndicates the type of offset i.e, \u201cearliest or latest\u201d. If initialOffset is \u201clatest\u201d, then the operator consumes messages from latest point of Kafka queue. If initialOffset is \u201cearliest\u201d, then the operator consumes messages starting from message queue. This can be overridden by OffsetManager.\n\n\n\n\n\n\n\n\n\nAbstract Methods\n\n\n\n\nvoid commitOffset(): Commit the offsets at checkpoint.\n\n\nMap \nKafkaPartition, Long\n getCurrentOffsets(): Return the current\n    offset status.\n\n\nresetPartitionsAndOffset(Set \nKafkaPartition\n partitionIds,\n    Map \nKafkaPartition, Long\n startOffset): Reset the partitions with\n    parittionIds and offsets with startOffset.\n\n\n\n\nConfiguration Parameters\u00a0for Simpl
 eKafkaConsumer\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nParameter\n\n\nType\n\n\nDefault\n\n\nDescription\n\n\n\n\n\n\nbufferSize\n\n\nint\n\n\n1 MB\n\n\nSpecifies the maximum total size of messages for each fetch request.\n\n\n\n\n\n\nmetadataRefreshInterval\n\n\nint\n\n\n30 Seconds\n\n\nInterval in between refresh the metadata change(broker change) in milliseconds. Enabling metadata refresh guarantees an automatic reconnect when a new broker is elected as the host. A value of -1 disables this feature.\n\n\n\n\n\n\nmetadataRefreshRetryLimit\n\n\nint\n\n\n-1\n\n\nSpecifies the maximum brokers' metadata refresh retry limit. -1 means unlimited retry.\n\n\n\n\n\n\n\n\n\nOffsetManager\n\n\nThis is an interface for offset management and is useful when consuming data\nfrom specified offsets. Updates the offsets for all the Kafka partitions\nperiodically. Below is the code snippet:\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\n\n\npublic interface OffsetManager\n{\n  public Map\nKafkaPartition,
  Long\n loadInitialOffsets();\n  public void updateOffsets(Map\nKafkaPartition, Long\n offsetsOfPartitions);\n}\n\n\n\n\nAbstract Methods\n\n\nMap \nKafkaPartition, Long\n loadInitialOffsets()\n: Specifies the initial offset for consuming messages; called at the activation stage.\n\n\nupdateOffsets(Map\nKafkaPartition, Long\n offsetsOfPartitions)\n: \u00a0This\nmethod is called at every repartitionCheckInterval to update offsets.\n\n\nPartitioning\n\n\nThe logical instance of the KafkaInputOperator acts as the Partitioner\nas well as a StatsListener. This is because the\nAbstractKafkaInputOperator implements both the\ncom.datatorrent.api.Partitioner and com.datatorrent.api.StatsListener\ninterfaces and provides an implementation of definePartitions(...) and\nprocessStats(...) which makes it auto-scalable.\n\n\nResponse processStats(BatchedOperatorStats stats)\n\n\nThe application master invokes this method on the logical instance with\nthe stats (tuplesProcessedPS, bytesPS, etc.) of
  each partition.\nRe-partitioning happens based on whether any new Kafka partitions added for\nthe topic or bytesPS and msgPS cross their respective upper bounds.\n\n\nDefinePartitions\n\n\nBased on the repartitionRequired field of the Response object which is\nreturned by processStats(...) method, the application master invokes\ndefinePartitions(...) on the logical instance which is also the\npartitioner instance. Dynamic partition can be disabled by setting the\nparameter repartitionInterval value to a negative value.\n\n\nAbstractSinglePortKafkaInputOperator\n\n\nThis class extends AbstractKafkaInputOperator to emit messages through single output port.\n\n\nPorts\n\n\noutputPort \nT\n: Tuples extracted from Kafka messages are emitted through this port.\n\n\nAbstract Methods\n\n\nT getTuple(Message msg)\n: Converts the Kafka message to tuple.\n\n\nConcrete Classes\n\n\n\n\nKafkaSinglePortStringInputOperator: extends \nAbstractSinglePortKafkaInputOperator\n, extracts string from Ka
 fka message.\n\n\nKafkaSinglePortByteArrayInputOperator: extends \nAbstractSinglePortKafkaInputOperator\n, extracts byte array from Kafka message.\n\n\n\n\nApplication Example\n\n\nThis section builds an Apex application using Kafka input operator.\nBelow is the code snippet:\n\n\n@ApplicationAnnotation(name = \nKafkaApp\n)\npublic class ExampleKafkaApplication implements StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, Configuration entries)\n  {\n    KafkaSinglePortByteArrayInputOperator input =  dag.addOperator(\nMessageReader\n, new KafkaSinglePortByteArrayInputOperator());\n    ConsoleOutputOperator output = dag.addOperator(\nOutput\n, new ConsoleOutputOperator());\n    dag.addStream(\nMessageData\n, input.outputPort, output.input);\n  }\n}\n\n\n\n\nBelow is the configuration for \u201ctest\u201d Kafka topic name and\n\u201clocalhost:2181\u201d is the zookeeper forum:\n\n\nproperty\n\n  \nname\ndt.operator.MessageReader.prop.topic\n/name\n\n  \nvalue\nte
 st\n/value\n\n\n/property\n\n\n\nproperty\n\n  \nname\ndt.operator.KafkaInputOperator.prop.zookeeper\n/nam\n\n  \nvalue\nlocalhost:2181\n/value\n\n\n/property\n\n\n\n\n\nKafka Input Operator for Kafka 0.9.x\n\n\nPackage: \norg.apache.apex.malhar.kafka\n\n\nMaven Artifact: \nmalhar-kafka\n\n\nThis version uses the new 0.9 version of consumer API and works with Kafka broker version 0.9 and later.\nThe operator is fault-tolerant, scalable and supports input from multiple clusters and multiple topics in a single operator instance.\n\n\nPre-requisites\n\n\nThis operator requires version 0.9.0 or later of the Kafka Consumer API.\n\n\nAbstractKafkaInputOperator\n\n\nPorts\n\n\n\n\nThis abstract class doesn't have any ports.\n\n\nConfiguration properties\n\n\n\n\n\n\n\n\nclusters\n - String[]\n\n\n\n\nMandatory Parameter.\n\n\nSpecifies the Kafka clusters that you want to consume messages from. To configure multi-cluster support, you need to specify the clusters separated by \";\".\n\n\n\n\
 n\n\n\n\ntopics\n - String[]\n\n\n\n\nMandatory Parameter.\n\n\nSpecified the Kafka topics that you want to consume messages from. If you want multi-topic support, then specify the topics separated by \",\".\n\n\n\n\n\n\n\n\nstrategy\n - PartitionStrategy\n\n\n\n\n\n\nOperator supports two types of partitioning strategies, \nONE_TO_ONE\n and \nONE_TO_MANY\n.\n\n\nONE_TO_ONE\n: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances.\n\nONE_TO_MANY\n: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and numb
 er of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances.\nDefault Value = \nPartitionStrategy.ONE_TO_ONE\n.\n\n\n\n\n\n\n\n\n\n\ninitialPartitionCount\n - Integer\n\n\n\n\nWhen the ONE_TO_MANY partition strategy is enabled, this value indicates the number of Kafka input operator instances. \n    Default Value = 1.\n\n\n\n\n\n\n\n\nrepartitionInterval\n - Long\n\n\n\n\nInterval specified in milliseconds. This value specifies the minimum time required between two repartition actions. \n    Default Value = 30 Seconds.\n\n\n\n\n\n\n\n\nrepartitionCheckInterval\n - Long\n\n\n\n\nInterval specified in milliseconds. This value specifies the minimum interval between two stat checks.\n    Default Value = 5 Seconds.\n\n\n\n\n\n\n\n\nmaxTuplesPerWindow\n - Integer\n\n\n\n\nControls the maximum number of messages emitted in each streaming window from this operator. Minimum value is 1. \n    Default value = \nMAX_VALUE\n \n\n\n\n\n\n\n\n\ninitialOffset\n - InitialOf
 fset\n\n\n\n\nIndicates the type of offset i.e, \nEARLIEST\n or \nLATEST\n or \nAPPLICATION_OR_EARLIEST\n or \nAPPLICATION_OR_LATEST\n. \n    \nLATEST\n =\n Consume new messages from latest offset in the topic. \n    \nEARLIEST\n =\n Consume all messages available in the topic.\n    \nAPPLICATION_OR_EARLIEST\n =\n Consume messages from committed position from last run. If there is no committed offset, then start consuming from beginning.\n    \nAPPLICATION_OR_LATEST\n =\n Consumes messages from committed position from last run. If a committed offset is unavailable, then start consuming from latest position.\n    Default value = \nInitialOffset.APPLICATION_OR_LATEST\n\n\n\n\n\n\n\n\nmetricsRefreshInterval\n - Long\n\n\n\n\nInterval specified in milliseconds. This value specifies the minimum interval between two metric stat updates.\n    Default value = 5 Seconds.\n\n\n\n\n\n\n\n\nconsumerTimeout\n - Long\n\n\n\n\nIndicates the \ntime waiting in poll\n when data is not available.\n   
  Default value = 5 Seconds.\n\n\n\n\n\n\n\n\nholdingBufferSize\n - Long\n\n\n\n\nIndicates the maximum number of messages kept in memory for emitting.\n    Default value = 1024.\n\n\n\n\n\n\n\n\nconsumerProps\n - Properties\n\n\n\n\nSpecify the [consumer properties[(http://kafka.apache.org/090/documentation.html#newconsumerconfigs) which are not yet set to the operator.\n\n\n\n\n\n\n\n\nwindowDataManager\n - WindowDataManager\n\n\n\n\nIf set to a value other than the default, such as \nFSWindowDataManager\n, specifies that the operator will process the same set of messages in a window before and after a failure. This is important but it comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window.\n    Default value = \nWindowDataManager.NoopWindowDataManager\n.\n\n\n\n\n\n\n\n\nAbstract Methods\n\n\nvoid emitTuple(String cluster, ConsumerRecord\nbyte[], byte[]\n message)\n: Abstract method that emits tuples\nextracted
  from Kafka message.\n\n\nConcrete Classes\n\n\nKafkaSinglePortInputOperator\n\n\nThis class extends from AbstractKafkaInputOperator and defines the \ngetTuple()\n method which extracts byte array from Kafka message.\n\n\nPorts\n\n\noutputPort \nbyte[]\n: Tuples extracted from Kafka messages are emitted through this port.\n\n\nApplication Example\n\n\nThis section builds an Apex application using Kafka input operator.\nBelow is the code snippet:\n\n\n@ApplicationAnnotation(name = \nKafkaApp\n)\npublic class ExampleKafkaApplication implements StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, Configuration entries)\n  {\n    KafkaSinglePortInputOperator input =  dag.addOperator(\nMessageReader\n, new KafkaSinglePortInputOperator());\n    ConsoleOutputOperator output = dag.addOperator(\nOutput\n, new ConsoleOutputOperator());\n    dag.addStream(\nMessageData\n, input.outputPort, output.input);\n  }\n}\n\n\n\n\nBelow is the configuration for \u201ctest\u201d Kafka
  topic name and\n\u201clocalhost:9092\u201d is the Broker:\n\n\nproperty\n\n  \nname\ndt.operator.MessageReader.prop.topics\n/name\n\n  \nvalue\ntest\n/value\n\n\n/property\n\n\n\nproperty\n\n  \nname\ndt.operator.KafkaInputOperator.prop.clusters\n/nam\n\n  \nvalue\nlocalhost:9092\n/value\n\n\n/property\n\n\n\n\n\nBy adding following lines to properties file, Kafka Input Operator supports multi-topic and multi-cluster:\n\n\nproperty\n\n  \nname\ndt.operator.MessageReader.prop.topics\n/name\n\n  \nvalue\ntest1, test2\n/value\n\n\n/property\n\n\n\nproperty\n\n  \nname\ndt.operator.KafkaInputOperator.prop.clusters\n/nam\n\n  \nvalue\nlocalhost:9092; localhost:9093; localhost:9094\n/value\n\n\n/property\n\n\n\n\n\nFor a full example application project, refer to https://github.com/DataTorrent/examples/tree/master/tutorials/kafka", 
+            "title": "Kafka Input"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#kafka-input-operator", 
+            "text": "", 
+            "title": "KAFKA INPUT OPERATOR"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#introduction", 
+            "text": "Apache Kafka  is a pull-based and distributed publish subscribe messaging system,\ntopics are partitioned and replicated across nodes.   The Kafka input operator consumes data from the partitions of a Kafka topic for processing in Apex. \nThe operator has the ability to automatically scale with the Kafka partitioning for high throughput. \nIt is fault-tolerant (consumer offset checkpointing) and guarantees idempotency to allow exactly-once results in the downstream pipeline.  For more information about the operator design see this  presentation \nand for processing guarantees this  blog .  There are two separate implementations of the input operator,\none built against Kafka 0.8 client and a newer version for the\nKafka 0.9 consumer API that also works with MapR Streams.\nThese reside in different packages and are described separately below.", 
+            "title": "Introduction"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#kafka-input-operator-for-kafka-08x", 
+            "text": "Package:  com.datatorrent.contrib.kafka  Maven artifact:  malhar-contrib", 
+            "title": "Kafka Input Operator for Kafka 0.8.x"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstractkafkainputoperator", 
+            "text": "This is the abstract implementation that serves as base class for consuming messages from Kafka messaging system. This class doesn\u2019t have any ports.", 
+            "title": "AbstractKafkaInputOperator"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#configuration-parameters", 
+            "text": "Parameter  Description    maxTuplesPerWindow  Controls the maximum number of messages emitted in each streaming window from this operator. Minimum value is 1. Default value = MAX_VALUE     idempotentStorageManager  This is an instance of IdempotentStorageManager. Idempotency ensures that the operator will process the same set of messages in a window before and after a failure. For example, let's say the operator completed window 10 and failed somewhere between window 11. If the operator gets restored at window 10 then it will process the same messages again in window 10 which it did in the previous run before the failure. Idempotency is important but comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. Default Value = com.datatorrent.lib.io.IdempotentStorageManager. NoopIdempotentStorageManager    strategy  Operator supports two types of partitioning strategies, ONE_TO_ONE and ONE_TO_MANY.
   ONE_TO_ONE: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances.  ONE_TO_MANY: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances.\nDefault Value = ONE_TO_ONE    msgRateUpperBound  Maximum messages upper bound. Operator repartitions when the  msgProcessedPS  exceeds this bound.  msgProcessedPS  is the average number of messages processed per second by this operator.    byteRateUpperBound  Maximum bytes upper bo
 und. Operator repartitions when the  bytesPS  exceeds this bound.  bytesPS  is the average number of bytes processed per second by this operator.     offsetManager  This is an optional parameter that is useful when the application restarts or start at specific offsets (offsets are explained below)    repartitionInterval  Interval specified in milliseconds. This value specifies the minimum time required between two repartition actions. Default Value = 30 Seconds    repartitionCheckInterval  Interval specified in milliseconds. This value specifies the minimum interval between two offset updates. Default Value = 5 Seconds    initialPartitionCount  When the ONE_TO_MANY partition strategy is enabled, this value indicates the number of Kafka input operator instances. Default Value = 1    consumer  This is an instance of com.datatorrent.contrib.kafka.KafkaConsumer. Default Value = Instance of SimpleKafkaConsumer.", 
+            "title": "Configuration Parameters"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods", 
+            "text": "void emitTuple(Message message) : Abstract method that emits tuples extracted from Kafka message.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#kafkaconsumer", 
+            "text": "This is an abstract implementation of Kafka consumer. It sends the fetch\nrequests to the leading brokers of Kafka partitions. For each request,\nit receives the set of messages and stores them into the buffer which is\nArrayBlockingQueue. SimpleKafkaConsumer\u00a0which extends\nKafkaConsumer and serves the functionality of Simple Consumer API and\nHighLevelKafkaConsumer which extends KafkaConsumer and \u00a0serves the\nfunctionality of High Level Consumer API.", 
+            "title": "KafkaConsumer"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#pre-requisites", 
+            "text": "This operator uses the Kafka 0.8.2.1 client consumer API\nand will work with 0.8.x and 0.7.x versions of Kafka broker.", 
+            "title": "Pre-requisites"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#configuration-parameters_1", 
+            "text": "Parameter  Type  Default  Description    zookeeper  String   Specifies the zookeeper quorum of Kafka clusters that you want to consume messages from. zookeeper \u00a0is a string in the form of hostname1:port1,hostname2:port2,hostname3:port3 \u00a0where hostname1,hostname2,hostname3 are hosts and port1,port2,port3 are ports of zookeeper server. \u00a0If the topic name is the same across the Kafka clusters and want to consume data from these clusters, then configure the zookeeper as follows: c1::hs1:p1,hs2:p2,hs3:p3;c2::hs4:p4,hs5:p5,c3::hs6:p6  where  c1,c2,c3 indicates the cluster names, hs1,hs2,hs3,hs4,hs5,hs6 are zookeeper hosts and p1,p2,p3,p4,p5,p6 are corresponding ports. Here, cluster name is optional in case of single cluster    cacheSize  int  1024  Maximum of buffered messages hold in memory.    topic  String  default_topic  Indicates the name of the topic.    initialOffset  String  latest  Indicates the type of offset i.e, \u201cearliest or latest\u201
 d. If initialOffset is \u201clatest\u201d, then the operator consumes messages from latest point of Kafka queue. If initialOffset is \u201cearliest\u201d, then the operator consumes messages starting from message queue. This can be overridden by OffsetManager.", 
+            "title": "Configuration Parameters"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods_1", 
+            "text": "void commitOffset(): Commit the offsets at checkpoint.  Map  KafkaPartition, Long  getCurrentOffsets(): Return the current\n    offset status.  resetPartitionsAndOffset(Set  KafkaPartition  partitionIds,\n    Map  KafkaPartition, Long  startOffset): Reset the partitions with\n    parittionIds and offsets with startOffset.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#configuration-parameters-for-simplekafkaconsumer", 
+            "text": "Parameter  Type  Default  Description    bufferSize  int  1 MB  Specifies the maximum total size of messages for each fetch request.    metadataRefreshInterval  int  30 Seconds  Interval in between refresh the metadata change(broker change) in milliseconds. Enabling metadata refresh guarantees an automatic reconnect when a new broker is elected as the host. A value of -1 disables this feature.    metadataRefreshRetryLimit  int  -1  Specifies the maximum brokers' metadata refresh retry limit. -1 means unlimited retry.", 
+            "title": "Configuration Parameters\u00a0for SimpleKafkaConsumer"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#offsetmanager", 
+            "text": "This is an interface for offset management and is useful when consuming data\nfrom specified offsets. Updates the offsets for all the Kafka partitions\nperiodically. Below is the code snippet:\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0  public interface OffsetManager\n{\n  public Map KafkaPartition, Long  loadInitialOffsets();\n  public void updateOffsets(Map KafkaPartition, Long  offsetsOfPartitions);\n}", 
+            "title": "OffsetManager"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods_2", 
+            "text": "Map  KafkaPartition, Long  loadInitialOffsets() : Specifies the initial offset for consuming messages; called at the activation stage.  updateOffsets(Map KafkaPartition, Long  offsetsOfPartitions) : \u00a0This\nmethod is called at every repartitionCheckInterval to update offsets.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#partitioning", 
+            "text": "The logical instance of the KafkaInputOperator acts as the Partitioner\nas well as a StatsListener. This is because the\nAbstractKafkaInputOperator implements both the\ncom.datatorrent.api.Partitioner and com.datatorrent.api.StatsListener\ninterfaces and provides an implementation of definePartitions(...) and\nprocessStats(...) which makes it auto-scalable.", 
+            "title": "Partitioning"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#response-processstatsbatchedoperatorstats-stats", 
+            "text": "The application master invokes this method on the logical instance with\nthe stats (tuplesProcessedPS, bytesPS, etc.) of each partition.\nRe-partitioning happens based on whether any new Kafka partitions added for\nthe topic or bytesPS and msgPS cross their respective upper bounds.", 
+            "title": "Response processStats(BatchedOperatorStats stats)"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#definepartitions", 
+            "text": "Based on the repartitionRequired field of the Response object which is\nreturned by processStats(...) method, the application master invokes\ndefinePartitions(...) on the logical instance which is also the\npartitioner instance. Dynamic partition can be disabled by setting the\nparameter repartitionInterval value to a negative value.", 
+            "title": "DefinePartitions"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstractsingleportkafkainputoperator", 
+            "text": "This class extends AbstractKafkaInputOperator to emit messages through single output port.", 
+            "title": "AbstractSinglePortKafkaInputOperator"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#ports", 
+            "text": "outputPort  T : Tuples extracted from Kafka messages are emitted through this port.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods_3", 
+            "text": "T getTuple(Message msg) : Converts the Kafka message to tuple.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#concrete-classes", 
+            "text": "KafkaSinglePortStringInputOperator: extends  AbstractSinglePortKafkaInputOperator , extracts string from Kafka message.  KafkaSinglePortByteArrayInputOperator: extends  AbstractSinglePortKafkaInputOperator , extracts byte array from Kafka message.", 
+            "title": "Concrete Classes"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#application-example", 
+            "text": "This section builds an Apex application using Kafka input operator.\nBelow is the code snippet:  @ApplicationAnnotation(name =  KafkaApp )\npublic class ExampleKafkaApplication implements StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, Configuration entries)\n  {\n    KafkaSinglePortByteArrayInputOperator input =  dag.addOperator( MessageReader , new KafkaSinglePortByteArrayInputOperator());\n    ConsoleOutputOperator output = dag.addOperator( Output , new ConsoleOutputOperator());\n    dag.addStream( MessageData , input.outputPort, output.input);\n  }\n}  Below is the configuration for \u201ctest\u201d Kafka topic name and\n\u201clocalhost:2181\u201d is the zookeeper forum:  property \n   name dt.operator.MessageReader.prop.topic /name \n   value test /value  /property  property \n   name dt.operator.KafkaInputOperator.prop.zookeeper /nam \n   value localhost:2181 /value  /property", 
+            "title": "Application Example"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#kafka-input-operator-for-kafka-09x", 
+            "text": "Package:  org.apache.apex.malhar.kafka  Maven Artifact:  malhar-kafka  This version uses the new 0.9 version of consumer API and works with Kafka broker version 0.9 and later.\nThe operator is fault-tolerant, scalable and supports input from multiple clusters and multiple topics in a single operator instance.", 
+            "title": "Kafka Input Operator for Kafka 0.9.x"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#pre-requisites_1", 
+            "text": "This operator requires version 0.9.0 or later of the Kafka Consumer API.", 
+            "title": "Pre-requisites"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstractkafkainputoperator_1", 
+            "text": "", 
+            "title": "AbstractKafkaInputOperator"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#ports_1", 
+            "text": "This abstract class doesn't have any ports.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#configuration-properties", 
+            "text": "clusters  - String[]   Mandatory Parameter.  Specifies the Kafka clusters that you want to consume messages from. To configure multi-cluster support, you need to specify the clusters separated by \";\".     topics  - String[]   Mandatory Parameter.  Specified the Kafka topics that you want to consume messages from. If you want multi-topic support, then specify the topics separated by \",\".     strategy  - PartitionStrategy    Operator supports two types of partitioning strategies,  ONE_TO_ONE  and  ONE_TO_MANY .  ONE_TO_ONE : If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances. ONE_TO_MANY : The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion
 . If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances.\nDefault Value =  PartitionStrategy.ONE_TO_ONE .      initialPartitionCount  - Integer   When the ONE_TO_MANY partition strategy is enabled, this value indicates the number of Kafka input operator instances. \n    Default Value = 1.     repartitionInterval  - Long   Interval specified in milliseconds. This value specifies the minimum time required between two repartition actions. \n    Default Value = 30 Seconds.     repartitionCheckInterval  - Long   Interval specified in milliseconds. This value specifies the minimum interval between two stat checks.\n    Default Value = 5 Seconds.     maxTuplesPerWindow  - Integer   Controls the maximum number of messages emitted in each streaming window from this operator. Minimum value is 1. 
 \n    Default value =  MAX_VALUE       initialOffset  - InitialOffset   Indicates the type of offset i.e,  EARLIEST  or  LATEST  or  APPLICATION_OR_EARLIEST  or  APPLICATION_OR_LATEST . \n     LATEST  =  Consume new messages from latest offset in the topic. \n     EARLIEST  =  Consume all messages available in the topic.\n     APPLICATION_OR_EARLIEST  =  Consume messages from committed position from last run. If there is no committed offset, then start consuming from beginning.\n     APPLICATION_OR_LATEST  =  Consumes messages from committed position from last run. If a committed offset is unavailable, then start consuming from latest position.\n    Default value =  InitialOffset.APPLICATION_OR_LATEST     metricsRefreshInterval  - Long   Interval specified in milliseconds. This value specifies the minimum interval between two metric stat updates.\n    Default value = 5 Seconds.     consumerTimeout  - Long   Indicates the  time waiting in poll  when data is not available.\n    Defaul
 t value = 5 Seconds.     holdingBufferSize  - Long   Indicates the maximum number of messages kept in memory for emitting.\n    Default value = 1024.     consumerProps  - Properties   Specify the [consumer properties[(http://kafka.apache.org/090/documentation.html#newconsumerconfigs) which are not yet set to the operator.     windowDataManager  - WindowDataManager   If set to a value other than the default, such as  FSWindowDataManager , specifies that the operator will process the same set of messages in a window before and after a failure. This is important but it comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window.\n    Default value =  WindowDataManager.NoopWindowDataManager .", 
+            "title": "Configuration properties"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods_4", 
+            "text": "void emitTuple(String cluster, ConsumerRecord byte[], byte[]  message) : Abstract method that emits tuples\nextracted from Kafka message.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#concrete-classes_1", 
+            "text": "", 
+            "title": "Concrete Classes"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#kafkasingleportinputoperator", 
+            "text": "This class extends from AbstractKafkaInputOperator and defines the  getTuple()  method which extracts byte array from Kafka message.", 
+            "title": "KafkaSinglePortInputOperator"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#ports_2", 
+            "text": "outputPort  byte[] : Tuples extracted from Kafka messages are emitted through this port.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#application-example_1", 
+            "text": "This section builds an Apex application using Kafka input operator.\nBelow is the code snippet:  @ApplicationAnnotation(name =  KafkaApp )\npublic class ExampleKafkaApplication implements StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, Configuration entries)\n  {\n    KafkaSinglePortInputOperator input =  dag.addOperator( MessageReader , new KafkaSinglePortInputOperator());\n    ConsoleOutputOperator output = dag.addOperator( Output , new ConsoleOutputOperator());\n    dag.addStream( MessageData , input.outputPort, output.input);\n  }\n}  Below is the configuration for \u201ctest\u201d Kafka topic name and\n\u201clocalhost:9092\u201d is the Broker:  property \n   name dt.operator.MessageReader.prop.topics /name \n   value test /value  /property  property \n   name dt.operator.KafkaInputOperator.prop.clusters /nam \n   value localhost:9092 /value  /property   By adding following lines to properties file, Kafka Input Operator supports mult
 i-topic and multi-cluster:  property \n   name dt.operator.MessageReader.prop.topics /name \n   value test1, test2 /value  /property  property \n   name dt.operator.KafkaInputOperator.prop.clusters /nam \n   value localhost:9092; localhost:9093; localhost:9094 /value  /property   For a full example application project, refer to https://github.com/DataTorrent/examples/tree/master/tutorials/kafka", 
+            "title": "Application Example"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/", 
+            "text": "JMS INPUT OPERATOR\n\n\nIntroduction: About the JMS Input Operator\n\n\nThe JMS input operator consumes data from a messaging system using the JMS client API. JMS not being a communication protocol, the operator needs an underlying JMS client API library to talk to a messaging system. Currently the operator has been tested with the Amazon SQS and Apache ActiveMQ System brokers via their respective JMS client API libraries.\n\n\nWhy is it needed ?\n\n\nYou will need the operator to read data from a messaging system (e.g. Apache ActiveMQ) via the JMS client API. The operator supports both the publish-subscribe (topics) and point-to-point (queues) modes. The operator currently does not support partitioning and dynamic scalability.\n\n\nJMSBase\n\n\nThis class encapsulates various JMS properties and behaviors and maintains connections with the JMS broker. This is the base class for JMS input and output adaptor operators. Operators should not directly subclass JMSBas
 e but one of the JMS input or output operators.\n\n\nAbstractJMSInputOperator\n\n\nThis abstract implementation serves as the base class for consuming generic messages from an external messaging system. Concrete subclasses implement conversion and emit methods to emit tuples for a concrete type. JMSStringInputOperator is one such subclass in the library used for String messages. JMSObjectInputOperator is another one used for multiple message types where the user has the ability to get String, byte array, Map or POJO messages on the respective output ports.\n\n\nConfiguration Parameters\n\n\nCommon configuration parameters are described here.\n\n\n\n\n\n\n\n\n\n\n\nParameter\n\n\nDescription\n\n\n\n\n\n\nwindowDataManager\n\n\nThis is an instance of \nWindowDataManager\n that implements idempotency. Idempotency ensures that an operator will process the same set of messages in a window before and after a failure. For example, say the operator completed window 10 and failed before or d
 uring window 11. If the operator gets restored at window 10, it will replay the messages of window 10 which were saved from the previous run before the failure. Although important, idempotency comes at a price because an operator needs to persist some state at the end of each window. Default Value = \norg.apache.apex.malhar.lib.wal.FSWindowDataManager\n\n\n\n\n\n\nconnectionFactoryBuilder\n\n\nThe operator uses the builder pattern that requires the user to specify an instance of \ncom.datatorrent.lib.io.jms.JMSBase.ConnectionFactoryBuilder\n. This builder creates the connection factory that encapsulates the underlying JMS client API library (e.g. ActiveMQ or Amazon SQS). By default the operator uses \ncom.datatorrent.lib.io.jms.JMSBase.DefaultConnectionFactoryBuilder\n which is used for ActiveMQ. One of the examples below describes the Amazon SQS use-case. \n\n\n\n\n\n\n\n\nAbstract Methods\n\n\nThe following abstract methods need to be implemented by concrete subclasses.\n\n\nT con
 vert(Message message): This method converts a JMS Message object to type T.\n\n\nvoid emit(T payload): This method emits a tuple given the payload extracted from a JMS message.\n\n\nConcrete Classes\n\n\n\n\n\n\nJMSStringInputOperator :\nThis class extends AbstractJMSInputOperator to deliver String payloads in the tuple.\n\n\n\n\n\n\nJMSObjectInputOperator:\nThis class extends AbstractJMSInputOperator to deliver String, byte array, Map or POJO payloads in the tuple.\n\n\n\n\n\n\nApplication Examples\n\n\nActiveMQ Example\n\n\nThe source code for the tutorial can be found here:\n\n\nhttps://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ\n\n\nThe following code snippet from the example illustrates how the DAG is created:\n\n\n  @Override\n  public void populateDAG(DAG dag, Configuration conf)\n  {\n    JMSStringInputOperator amqInput = dag.addOperator(\namqIn\n, \n        new JMSStringInputOperator());\n\n    LineOutputOperator out = dag.addOperator(\nfileOut\n, new
  LineOutputOperator());\n\n    dag.addStream(\ndata\n, amqInput.output, out.input);\n  }\n\n\n\n\nThe DAG consists of only 2 operators: the \nJMSStringInputOperator\n which is the input operator that feeds received ActiveMQ messages into the output operator \nLineOutputOperator\n which outputs these messages into a file or files.\n\n\nThe default connectionFactoryBuilder supports ActiveMQ so there is no need to set this value. However the following ActiveMQ related values need to be set either from properties files or using the appropriate setter methods in the code:\n\n\n\n\n\n\n\n\n\n\n\n\nValue\n\n\nDescription\n\n\n\n\n\n\nconnectionFactoryProperties\n\n\nThis is a Map of key and value strings and can be set directly from configuration as in the example above. The table below describes the most important properties.\n\n\n\n\n\n\ntopic\n\n\nThis boolean value is set to true for the publish-subscribe case and false for the PTP (point-to-point) case.\n\n\n\n\n\n\nsubject\n\n\nThis 
 is the queue name for PTP (point-to-point) use-case and topic name for the publish-subscribe use case.\n\n\n\n\n\n\ndurable\n\n\nThis boolean value is set to true for durable subscriptionss, false otherwise. Durable subscriptions save messages to persistent storage until consumed. Used only when the clientId (see below) is set.\n\n\n\n\n\n\nclientId\n\n\nThe client-ID for this ActiveMQ consumer in the durable subscription mode as described above.\n\n\n\n\n\n\ntransacted\n\n\nThis boolean value is set to true for transacted JMS sessions as described in \n\nSession\n.\n\n\n\n\n\n\nackMode\n\n\nThis string value sets the acknowledgement mode as described in \n\nSession fields\n.\n\n\n\n\n\n\n\n\n\nThe following table describes the string properties to be set in the map that is passed in the connectionFactoryProperties value described above.\n\n\n\n\n\n\n\n\n\n\n\n\nProperty Name\n\n\nDescription\n\n\n\n\n\n\nbrokerURL\n\n\nThe \nconnection URL\n \nused to connect to the ActiveMQ broker
 \n\n\n\n\nuserName\n\n\nThe JMS userName used by connections created by this factory (optional when anonymous access is used)\n\n\n\n\n\n\npassword\n\n\nThe JMS password used for connections created from this factory (optional when anonymous access is used)\n\n\n\n\n\n\n\n\n\nThese properties can be set from the properties.xml file as shown below \n(from the example \nhttps://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ\n ).\n\n\nconfiguration\n\n  \nproperty\n\n    \nname\ndt.operator.amqIn.prop.connectionFactoryProperties.brokerURL\n/name\n\n    \nvalue\nvm://localhost\n/value\n\n  \n/property\n\n  \nproperty\n\n    \nname\ndt.operator.amqIn.prop.subject\n/name\n\n    \nvalue\njms4Amq\n/value\n\n  \n/property\n\n\n/configuration\n                                                                                                        \n\n\n\n\nSQS Example\n\n\nThe source code for the tutorial can be found here:\n\n\nhttps://github.com/DataTorrent/examples/tree/m
 aster/tutorials/jmsSqs\n\n\nThe following code snippet from the example illustrates how the DAG is created:\n\n\n @Override\n public void populateDAG(DAG dag, Configuration conf)\n {\n\n   JMSStringInputOperator sqsInput = dag.addOperator(\nsqsIn\n, \n       new JMSStringInputOperator());\n\n   MyConnectionFactoryBuilder factoryBuilder = new MyConnectionFactoryBuilder();\n\n   factoryBuilder.sqsDevCredsFilename = conf.get(SQSDEV_CREDS_FILENAME_PROPERTY);\n\n   sqsInput.setConnectionFactoryBuilder(factoryBuilder);\n\n   LineOutputOperator out = dag.addOperator(\nfileOut\n, new LineOutputOperator());\n\n   dag.addStream(\ndata\n, sqsInput.output, out.input);\n }\n\n\n\n\nThe DAG consists of only 2 operators: the \nJMSStringInputOperator\n which is the input operator that feeds received SQS messages into the output operator \nLineOutputOperator\n which outputs these messages into a file or files. The code also shows how the AWS/SQS credentials are initialized in the factory builder. \n
 \n\nFor SQS you will have to provide a custom connectionFactoryBuilder as shown in the example above and in \nSQSConnectionFactory.java\n. The builder is typically used to supply AWS region and credential information that cannot be supplied via any JMS interfaces.\n\n\nThe following code snippet shows a typical Builder implementation that can be supplied to the operator. The AWS credentials are supplied via a \nPropertiesFileCredentialsProvider\n object in which sqsCredsFilename is the fully qualified path to a properties file from which the AWS security credentials are to be loaded. For example \n/etc/somewhere/credentials.properties\n\n\nstatic class MyConnectionFactoryBuilder implements JMSBase.ConnectionFactoryBuilder {\n\nString sqsCredsFilename;\n\nMyConnectionFactoryBuilder()\n{\n}\n\n@Override\npublic ConnectionFactory buildConnectionFactory() \n{\n  // Create the connection factory using the properties file credential provider.\n  // Connections this factory creates can tal
 k to the queues in us-east-1 region. \n  SQSConnectionFactory connectionFactory =\n    SQSConnectionFactory.builder()\n      .withRegion(Region.getRegion(Regions.US_EAST_1))\n      .withAWSCredentialsProvider(new PropertiesFileCredentialsProvider(sqsCredsFilename))\n      .build();\n    return connectionFactory;\n  }\n}", 
+            "title": "JMS Input"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#jms-input-operator", 
+            "text": "", 
+            "title": "JMS INPUT OPERATOR"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#introduction-about-the-jms-input-operator", 
+            "text": "The JMS input operator consumes data from a messaging system using the JMS client API. JMS not being a communication protocol, the operator needs an underlying JMS client API library to talk to a messaging system. Currently the operator has been tested with the Amazon SQS and Apache ActiveMQ System brokers via their respective JMS client API libraries.", 
+            "title": "Introduction: About the JMS Input Operator"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#why-is-it-needed", 
+            "text": "You will need the operator to read data from a messaging system (e.g. Apache ActiveMQ) via the JMS client API. The operator supports both the publish-subscribe (topics) and point-to-point (queues) modes. The operator currently does not support partitioning and dynamic scalability.", 
+            "title": "Why is it needed ?"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#jmsbase", 
+            "text": "This class encapsulates various JMS properties and behaviors and maintains connections with the JMS broker. This is the base class for JMS input and output adaptor operators. Operators should not directly subclass JMSBase but one of the JMS input or output operators.", 
+            "title": "JMSBase"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#abstractjmsinputoperator", 
+            "text": "This abstract implementation serves as the base class for consuming generic messages from an external messaging system. Concrete subclasses implement conversion and emit methods to emit tuples for a concrete type. JMSStringInputOperator is one such subclass in the library used for String messages. JMSObjectInputOperator is another one used for multiple message types where the user has the ability to get String, byte array, Map or POJO messages on the respective output ports.", 
+            "title": "AbstractJMSInputOperator"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#configuration-parameters", 
+            "text": "Common configuration parameters are described here.      Parameter  Description    windowDataManager  This is an instance of  WindowDataManager  that implements idempotency. Idempotency ensures that an operator will process the same set of messages in a window before and after a failure. For example, say the operator completed window 10 and failed before or during window 11. If the operator gets restored at window 10, it will replay the messages of window 10 which were saved from the previous run before the failure. Although important, idempotency comes at a price because an operator needs to persist some state at the end of each window. Default Value =  org.apache.apex.malhar.lib.wal.FSWindowDataManager    connectionFactoryBuilder  The operator uses the builder pattern that requires the user to specify an instance of  com.datatorrent.lib.io.jms.JMSBase.ConnectionFactoryBuilder . This builder creates the connection factory that encapsulates the underlying JMS cl
 ient API library (e.g. ActiveMQ or Amazon SQS). By default the operator uses  com.datatorrent.lib.io.jms.JMSBase.DefaultConnectionFactoryBuilder  which is used for ActiveMQ. One of the examples below describes the Amazon SQS use-case.", 
+            "title": "Configuration Parameters"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#abstract-methods", 
+            "text": "The following abstract methods need to be implemented by concrete subclasses.  T convert(Message message): This method converts a JMS Message object to type T.  void emit(T payload): This method emits a tuple given the payload extracted from a JMS message.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#concrete-classes", 
+            "text": "JMSStringInputOperator :\nThis class extends AbstractJMSInputOperator to deliver String payloads in the tuple.    JMSObjectInputOperator:\nThis class extends AbstractJMSInputOperator to deliver String, byte array, Map or POJO payloads in the tuple.", 
+            "title": "Concrete Classes"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#application-examples", 
+            "text": "", 
+            "title": "Application Examples"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#activemq-example", 
+            "text": "The source code for the tutorial can be found here:  https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ  The following code snippet from the example illustrates how the DAG is created:    @Override\n  public void populateDAG(DAG dag, Configuration conf)\n  {\n    JMSStringInputOperator amqInput = dag.addOperator( amqIn , \n        new JMSStringInputOperator());\n\n    LineOutputOperator out = dag.addOperator( fileOut , new LineOutputOperator());\n\n    dag.addStream( data , amqInput.output, out.input);\n  }  The DAG consists of only 2 operators: the  JMSStringInputOperator  which is the input operator that feeds received ActiveMQ messages into the output operator  LineOutputOperator  which outputs these messages into a file or files.  The default connectionFactoryBuilder supports ActiveMQ so there is no need to set this value. However the following ActiveMQ related values need to be set either from properties files or using the appropriate
  setter methods in the code:       Value  Description    connectionFactoryProperties  This is a Map of key and value strings and can be set directly from configuration as in the example above. The table below describes the most important properties.    topic  This boolean value is set to true for the publish-subscribe case and false for the PTP (point-to-point) case.    subject  This is the queue name for PTP (point-to-point) use-case and topic name for the publish-subscribe use case.    durable  This boolean value is set to true for durable subscriptionss, false otherwise. Durable subscriptions save messages to persistent storage until consumed. Used only when the clientId (see below) is set.    clientId  The client-ID for this ActiveMQ consumer in the durable subscription mode as described above.    transacted  This boolean value is set to true for transacted JMS sessions as described in  Session .    ackMode  This string value sets the acknowledgement mode as described in  Sessio
 n fields .     The following table describes the string properties to be set in the map that is passed in the connectionFactoryProperties value described above.       Property Name  Description    brokerURL  The  connection URL  \nused to connect to the ActiveMQ broker   userName  The JMS userName used by connections created by this factory (optional when anonymous access is used)    password  The JMS password used for connections created from this factory (optional when anonymous access is used)     These properties can be set from the properties.xml file as shown below \n(from the example  https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ  ).  configuration \n   property \n     name dt.operator.amqIn.prop.connectionFactoryProperties.brokerURL /name \n     value vm://localhost /value \n   /property \n   property \n     name dt.operator.amqIn.prop.subject /name \n     value jms4Amq /value \n   /property  /configuration", 
+            "title": "ActiveMQ Example"
+        }, 
+        {
+            "location": "/operators/jmsInputOperator/#sqs-example", 
+            "text": "The source code for the tutorial can be found here:  https://github.com/DataTorrent/examples/tree/master/tutorials/jmsSqs  The following code snippet from the example illustrates how the DAG is created:   @Override\n public void populateDAG(DAG dag, Configuration conf)\n {\n\n   JMSStringInputOperator sqsInput = dag.addOperator( sqsIn , \n       new JMSStringInputOperator());\n\n   MyConnectionFactoryBuilder factoryBuilder = new MyConnectionFactoryBuilder();\n\n   factoryBuilder.sqsDevCredsFilename = conf.get(SQSDEV_CREDS_FILENAME_PROPERTY);\n\n   sqsInput.setConnectionFactoryBuilder(factoryBuilder);\n\n   LineOutputOperator out = dag.addOperator( fileOut , new LineOutputOperator());\n\n   dag.addStream( data , sqsInput.output, out.input);\n }  The DAG consists of only 2 operators: the  JMSStringInputOperator  which is the input operator that feeds received SQS messages into the output operator  LineOutputOperator  which outputs these messages into a file or fil
 es. The code also shows how the AWS/SQS credentials are initialized in the factory builder.   For SQS you will have to provide a custom connectionFactoryBuilder as shown in the example above and in  SQSConnectionFactory.java . The builder is typically used to supply AWS region and credential information that cannot be supplied via any JMS interfaces.  The following code snippet shows a typical Builder implementation that can be supplied to the operator. The AWS credentials are supplied via a  PropertiesFileCredentialsProvider  object in which sqsCredsFilename is the fully qualified path to a properties file from which the AWS security credentials are to be loaded. For example  /etc/somewhere/credentials.properties  static class MyConnectionFactoryBuilder implements JMSBase.ConnectionFactoryBuilder {\n\nString sqsCredsFilename;\n\nMyConnectionFactoryBuilder()\n{\n}\n\n@Override\npublic ConnectionFactory buildConnectionFactory() \n{\n  // Create the connection factory using the proper
 ties file credential provider.\n  // Connections this factory creates can talk to the queues in us-east-1 region. \n  SQSConnectionFactory connectionFactory =\n    SQSConnectionFactory.builder()\n      .withRegion(Region.getRegion(Regions.US_EAST_1))\n      .withAWSCredentialsProvider(new PropertiesFileCredentialsProvider(sqsCredsFilename))\n      .build();\n    return connectionFactory;\n  }\n}", 
+            "title": "SQS Example"
+        }, 
+        {
+            "location": "/operators/file_splitter/", 
+            "text": "File Splitter\n\n\nThis is a simple operator whose main function is to split a file virtually and create metadata describing the files and the splits. \n\n\nWhy is it needed?\n\n\nIt is a common operation to read a file and parse it. This operation can be parallelized by having multiple partitions of such operators and each partition operating on different files. However, at times when a file is large then a single partition reading it can become a bottleneck.\nIn these cases, throughput can be increased if instances of the partitioned operator can read and parse non-overlapping sets of file blocks. This is where file splitter comes in handy. It creates metadata of blocks of file which serves as tasks handed out to downstream operator partitions. \nThe downstream partitions can read/parse the block without the need of interacting with other partitions.\n\n\nClass Diagram\n\n\n\n\nAbstractFileSplitter\n\n\nThe abstract implementation defines the logic of processi
 ng \nFileInfo\n. This comprises the following tasks -  \n\n\n\n\n\n\nbuilding \nFileMetadata\n per file and emitting it. This metadata contains the file information such as filepath, no. of blocks in it, length of the file, all the block ids, etc.\n\n\n\n\n\n\ncreating \nBlockMetadataIterator\n from \nFileMetadata\n. The iterator lazy-loads the block metadata when needed. We use an iterator because the no. of blocks in a file can be huge if the block size is small and loading all of them at once in memory may cause out of memory errors.\n\n\n\n\n\n\nretrieving \nBlockMetadata.FileBlockMetadata\n from the block metadata iterator and emitting it. The FileBlockMetadata contains the block id, start offset of the block, length of file in the block, etc. The number of block metadata emitted per window are controlled by \nblocksThreshold\n setting which by default is 1.  \n\n\n\n\n\n\nThe main utility method that performs all the above tasks is the \nprocess()\n method. Concrete implementa
 tions can invoke this method whenever they have data to process.\n\n\nPorts\n\n\nDeclares only output ports on which file metadata and block metadata are emitted.\n\n\n\n\nfilesMetadataOutput: metadata for each file is emitted on this port. \n\n\nblocksMetadataOutput: metadata for each block is emitted on this port. \n\n\n\n\nprocess()\n method\n\n\nWhen process() is invoked, any pending blocks from the current file are emitted on the 'blocksMetadataOutput' port. If the threshold for blocks per window is still not met then a new input file is processed - corresponding metadata is emitted on 'filesMetadataOutput' and more of its blocks are emitted. This operation is repeated until the \nblocksThreshold\n is reached or there are no more new files.\n\n\n  protected void process()\n  {\n    if (blockMetadataIterator != null \n blockCount \n blocksThreshold) {\n      emitBlockMetadata();\n    }\n\n    FileInfo fileInfo;\n    while (blockCount \n blocksThreshold \n (fileInfo = getFileInfo
 ()) != null) {\n      if (!processFileInfo(fileInfo)) {\n        break;\n      }\n    }\n  }\n\n\n\n\nAbstract methods\n\n\n\n\n\n\nFileInfo getFileInfo()\n: called from within the \nprocess()\n and provides the next file to process.\n\n\n\n\n\n\nlong getDefaultBlockSize()\n: provides the block size which is used when user hasn't configured the size.\n\n\n\n\n\n\nFileStatus getFileStatus(Path path)\n: provides the \norg.apache.hadoop.fs.FileStatus\n instance for a path.   \n\n\n\n\n\n\nConfiguration\n\n\n\n\nblockSize\n: size of a block.\n\n\nblocksThreshold\n: threshold on the number of blocks emitted by file splitter every window. This setting is used for throttling the work for downstream operators.\n\n\n\n\nFileSplitterBase\n\n\nSimple operator that receives tuples of type \nFileInfo\n on its \ninput\n port. \nFileInfo\n contains the information (currently just the file path) about the file which this operator uses to create file metadata and block metadata.\n\n\nExample applica
 tion\n\n\nThis is a simple sub-dag that demonstrates how FileSplitterBase can be plugged into an application.\n\n\n\nThe upstream operator emits tuples of type \nFileInfo\n on its output port which is connected to splitter input port. The downstream receives tuples of type \nBlockMetadata.FileBlockMetadata\n from the splitter's block metadata output port.\n\n\npublic class ApplicationWithBaseSplitter implements StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, Configuration configuration)\n  {\n    JMSInput input = dag.addOperator(\nInput\n, new JMSInput());\n    FileSplitterBase splitter = dag.addOperator(\nSplitter\n, new FileSplitterBase());\n    FSSliceReader blockReader = dag.addOperator(\nBlockReader\n, new FSSliceReader());\n    ...\n    dag.addStream(\nfile-info\n, input.output, splitter.input);\n    dag.addStream(\nblock-metadata\n, splitter.blocksMetadataOutput, blockReader.blocksMetadataInput);\n    ...\n  }\n\n  public static class JMSInput extends
  AbstractJMSInputOperator\nAbstractFileSplitter.FileInfo\n\n  {\n\n    public final transient DefaultOutputPort\nAbstractFileSplitter.FileInfo\n output = new DefaultOutputPort\n();\n\n    @Override\n    protected AbstractFileSplitter.FileInfo convert(Message message) throws JMSException\n    {\n      //assuming the message is a text message containing the absolute path of the file.\n      return new AbstractFileSplitter.FileInfo(null, ((TextMessage)message).getText());\n    }\n\n    @Override\n    protected void emit(AbstractFileSplitter.FileInfo payload)\n    {\n      output.emit(payload);\n    }\n  }\n}\n\n\n\n\nPorts\n\n\nDeclares an input port on which it receives tuples from the upstream operator. Output ports are inherited from AbstractFileSplitter.\n\n\n\n\ninput: non optional port on which tuples of type \nFileInfo\n are received.\n\n\n\n\nConfiguration\n\n\n\n\nfile\n: path of the file from which the filesystem is inferred. FileSplitter creates an instance of \norg.apache.h
 adoop.fs.FileSystem\n which is why this path is needed.  \n\n\n\n\nFileSystem.newInstance(new Path(file).toUri(), new Configuration());\n\n\n\n\nThe fs instance is then used to fetch the default block size and \norg.apache.hadoop.fs.FileStatus\n for each file path.\n\n\nFileSplitterInput\n\n\nThis is an input operator that discovers files itself. The scanning of the directories for new files is asynchronous which is handled by \nTimeBasedDirectoryScanner\n. The function of TimeBasedDirectoryScanner is to periodically scan specified directories and find files which were newly added or modified. The interaction between the operator and the scanner is depicted in the diagram below.\n\n\n\n\nExample application\n\n\nThis is a simple sub-dag that demonstrates how FileSplitterInput can be plugged into an application.\n\n\n\n\nSplitter is the input operator here that sends block metadata to the downstream BlockReader.\n\n\n  @Override\n  public void populateDAG(DAG dag, Configuration confi
 guration)\n  {\n    FileSplitterInput input = dag.addOperator(\nInput\n, new FileSplitterInput());\n    FSSliceReader reader = dag.addOperator(\nBlock Reader\n, new FSSliceReader());\n    ...\n    dag.addStream(\nblock-metadata\n, input.blocksMetadataOutput, reader.blocksMetadataInput);\n    ...\n  }\n\n\n\n\n\nPorts\n\n\nSince it is an input operator there are no input ports and output ports are inherited from AbstractFileSplitter.\n\n\nConfiguration\n\n\n\n\nscanner\n: the component that scans directories asynchronously. It is of type \ncom.datatorrent.lib.io.fs.FileSplitter.TimeBasedDirectoryScanner\n. The basic implementation of TimeBasedDirectoryScanner can be customized by users.  \n\n\n\n\na. \nfiles\n: comma separated list of directories to scan.  \n\n\nb. \nrecursive\n: flag that controls whether the directories should be scanned recursively.  \n\n\nc. \nscanIntervalMillis\n: interval specified in milliseconds after which another scan iteration is triggered.  \n\n\nd. \nfil
 ePatternRegularExp\n: regular expression for accepted file names.  \n\n\ne. \ntrigger\n: a flag that triggers a scan iteration instantly. If the scanner thread is idling then it will initiate a scan immediately otherwise if a scan is in progress, then the new iteration will be triggered immediately after the completion of current one.\n2. \nidempotentStorageManager\n: by default FileSplitterInput is idempotent. \nIdempotency ensures that the operator will process the same set of files/blocks in a window if it has seen that window previously, i.e., before a failure. For example, let's say the operator completed window 10 and failed somewhere between window 11. If the operator gets restored at window 10 then it will process the same file/block again in window 10 which it did in the previous run before the failure. Idempotency is important but comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. Therefore, if one
  doesn't care about idempotency then they can set this property to be an instance of \ncom.datatorrent.lib.io.IdempotentStorageManager.NoopIdempotentStorageManager\n.\n\n\nHandling of split records\n\n\nSplitting of files to create tasks for downstream operator needs to be a simple operation that doesn't consume a lot of resources and is fast. This is why the file splitter doesn't open files to read. The downside of that is if the file contains records then a record may split across adjacent blocks. Handling of this is left to the downstream operator.\n\n\nWe have created Block readers in Apex-malhar library that handle line splits efficiently. The 2 line readers- \nAbstractFSLineReader\n and \nAbstractFSReadAheadLineReader\n can be found here \nAbstractFSBlockReader\n.", 
+            "title": "File Splitter"
+        }, 
+        {
+            "location": "/operators/file_splitter/#file-splitter", 
+            "text": "This is a simple operator whose main function is to split a file virtually and create metadata describing the files and the splits.", 
+            "title": "File Splitter"
+        }, 
+        {
+            "location": "/operators/file_splitter/#why-is-it-needed", 
+            "text": "It is a common operation to read a file and parse it. This operation can be parallelized by having multiple partitions of such operators and each partition operating on different files. However, at times when a file is large then a single partition reading it can become a bottleneck.\nIn these cases, throughput can be increased if instances of the partitioned operator can read and parse non-overlapping sets of file blocks. This is where file splitter comes in handy. It creates metadata of blocks of file which serves as tasks handed out to downstream operator partitions. \nThe downstream partitions can read/parse the block without the need of interacting with other partitions.", 
+            "title": "Why is it needed?"
+        }, 
+        {
+            "location": "/operators/file_splitter/#class-diagram", 
+            "text": "", 
+            "title": "Class Diagram"
+        }, 
+        {
+            "location": "/operators/file_splitter/#abstractfilesplitter", 
+            "text": "The abstract implementation defines the logic of processing  FileInfo . This comprises the following tasks -      building  FileMetadata  per file and emitting it. This metadata contains the file information such as filepath, no. of blocks in it, length of the file, all the block ids, etc.    creating  BlockMetadataIterator  from  FileMetadata . The iterator lazy-loads the block metadata when needed. We use an iterator because the no. of blocks in a file can be huge if the block size is small and loading all of them at once in memory may cause out of memory errors.    retrieving  BlockMetadata.FileBlockMetadata  from the block metadata iterator and emitting it. The FileBlockMetadata contains the block id, start offset of the block, length of file in the block, etc. The number of block metadata emitted per window are controlled by  blocksThreshold  setting which by default is 1.      The main utility method that performs all the above tasks is the  process()  met
 hod. Concrete implementations can invoke this method whenever they have data to process.", 
+            "title": "AbstractFileSplitter"
+        }, 
+        {
+            "location": "/operators/file_splitter/#ports", 
+            "text": "Declares only output ports on which file metadata and block metadata are emitted.   filesMetadataOutput: metadata for each file is emitted on this port.   blocksMetadataOutput: metadata for each block is emitted on this port.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/file_splitter/#abstract-methods", 
+            "text": "FileInfo getFileInfo() : called from within the  process()  and provides the next file to process.    long getDefaultBlockSize() : provides the block size which is used when user hasn't configured the size.    FileStatus getFileStatus(Path path) : provides the  org.apache.hadoop.fs.FileStatus  instance for a path.", 
+            "title": "Abstract methods"
+        }, 
+        {
+            "location": "/operators/file_splitter/#configuration", 
+            "text": "blockSize : size of a block.  blocksThreshold : threshold on the number of blocks emitted by file splitter every window. This setting is used for throttling the work for downstream operators.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": "/operators/file_splitter/#filesplitterbase", 
+            "text": "Simple operator that receives tuples of type  FileInfo  on its  input  port.  FileInfo  contains the information (currently just the file path) about the file which this operator uses to create file metadata and block metadata.", 
+            "title": "FileSplitterBase"
+        }, 
+        {
+            "location": "/operators/file_splitter/#example-application", 
+            "text": "This is a simple sub-dag that demonstrates how FileSplitterBase can be plugged into an application.  The upstream operator emits tuples of type  FileInfo  on its output port which is connected to splitter input port. The downstream receives tuples of type  BlockMetadata.FileBlockMetadata  from the splitter's block metadata output port.  public class ApplicationWithBaseSplitter implements StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, Configuration configuration)\n  {\n    JMSInput input = dag.addOperator( Input , new JMSInput());\n    FileSplitterBase splitter = dag.addOperator( Splitter , new FileSplitterBase());\n    FSSliceReader blockReader = dag.addOperator( BlockReader , new FSSliceReader());\n    ...\n    dag.addStream( file-info , input.output, splitter.input);\n    da

<TRUNCATED>