You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/29 20:10:16 UTC

[03/12] apex-site git commit: Adding malhar-3.5.0 documentation

http://git-wip-us.apache.org/repos/asf/apex-site/blob/37ae8b76/docs/malhar-3.5/mkdocs/search_index.json
----------------------------------------------------------------------
diff --git a/docs/malhar-3.5/mkdocs/search_index.json b/docs/malhar-3.5/mkdocs/search_index.json
new file mode 100644
index 0000000..524fd0a
--- /dev/null
+++ b/docs/malhar-3.5/mkdocs/search_index.json
@@ -0,0 +1,524 @@
+{
+    "docs": [
+        {
+            "location": "/", 
+            "text": "Apache Apex Malhar\n\n\nApache Apex Malhar is an open source operator and codec library that can be used with the \nApache Apex\n platform to build real-time streaming applications.  Enabling users to extract value quickly, Malhar operators help get data in, analyze it in real-time, and get data out of Hadoop.  In addition to the operators, the library contains a number of demos applications, demonstrating operator features and capabilities.\n\n\n\n\nCapabilities common across Malhar operators\n\n\nFor most streaming platforms, connectors are afterthoughts and often end up being simple \u2018bolt-ons\u2019 to the platform. As a result they often cause performance issues or data loss when put through failure scenarios and scalability requirements. Malhar operators do not face these issues as they were designed to be integral parts of Apex. Hence, they have following core streaming runtime capabilities\n\n\n\n\nFault tolerance\n \u2013 Malhar operators where appli
 cable have fault tolerance built in. They use the checkpoint capability provided by the framework to ensure that there is no data loss under ANY failure scenario.\n\n\nProcessing guarantees\n \u2013 Malhar operators where applicable provide out of the box support for ALL three processing guarantees \u2013 exactly once, at-least once, and at-most once WITHOUT requiring the user to write any additional code.  Some operators, like MQTT operator, deal with source systems that can not track processed data and hence need the operators to keep track of the data.  Malhar has support for a generic operator that uses alternate storage like HDFS to facilitate this.  Finally for databases that support transactions or support any sort of atomic batch operations Malhar operators can do exactly once down to the tuple level.\n\n\nDynamic updates\n \u2013 Based on changing business conditions you often have to tweak several parameters used by the operators in your streaming application without incur
 ring any application downtime. You can also change properties of a Malhar operator at runtime without having to bring down the application.\n\n\nEase of extensibility\n \u2013 Malhar operators are based on templates that are easy to extend.\n\n\nPartitioning support\n \u2013 In streaming applications the input data stream often needs to be partitioned based on the contents of the stream. Also for operators that ingest data from external systems partitioning needs to be done based on the capabilities of the external system.  For example with Kafka, the operator can automatically scale up or down based on the changes in the number of Kafka partitions.\n\n\n\n\nOperator Library Overview\n\n\nInput/output connectors\n\n\nBelow is a summary of the various sub categories of input and output operators. Input operators also have a corresponding output operator\n\n\n\n\nFile Systems\n \u2013 Most streaming analytics use cases require the data to be stored in HDFS or perhaps S3 if the applica
 tion is running in AWS.  Users often need to re-run their streaming analytical applications against historical data or consume data from upstream processes that are perhaps writing to some NFS share.  Apex supports input \n output operators for HDFS, S3, NFS \n Local Files.  There are also File Splitter and Block Reader operators, which can accelecate processing of large files by splitting and paralellizing the work across non-overlapping sets of file blocks.\n\n\nRelational Databases\n \u2013 Most stream processing use cases require some reference data lookups to enrich, tag or filter streaming data. There is also a need to save results of the streaming analytical computation to a database so an operational dashboard can see them. Apex supports a JDBC operator so you can read/write data from any JDBC compliant RDBMS like Oracle, MySQL, Sqlite, etc.\n\n\nNoSQL Databases\n \u2013 NoSQL key-value pair databases like Cassandra \n HBase are a common part of streaming analytics applicati
 on architectures to lookup reference data or store results.  Malhar has operators for HBase, Cassandra, Accumulo, Aerospike, MongoDB, and CouchDB.\n\n\nMessaging Systems\n \u2013 Kafka, JMS, and similar systems are the workhorses of messaging infrastructure in most enterprises.  Malhar has a robust, industry-tested set of operators to read and write Kafka, JMS, ZeroMQ, and RabbitMQ messages.\n\n\nNotification Systems\n \u2013 Malhar includes an operator for sending notifications via SMTP.\n\n\nIn-memory Databases \n Caching platforms\n - Some streaming use cases need instantaneous access to shared state across the application. Caching platforms and in-memory databases serve this purpose really well. To support these use cases, Malhar has operators for memcached and Redis.\n\n\nSocial Media\n - Malhar includes an operator to connect to the popular Twitter stream fire hose.\n\n\nProtocols\n - Malhar provides connectors that can communicate in HTTP, RSS, Socket, WebSocket, FTP, and MQT
 T.\n\n\n\n\nParsers\n\n\nThere are many industry vertical specific data formats that a streaming application developer might need to parse. Often there are existing parsers available for these that can be directly plugged into an Apache Apex application. For example in the Telco space, a Java based CDR parser can be directly plugged into Apache Apex operator. To further simplify development experience, Malhar also provides some operators for parsing common formats like XML (DOM \n SAX), JSON (flat map converter), Apache log files, syslog, etc.\n\n\nStream manipulation\n\n\nStreaming data inevitably needs processing to clean, filter, tag, summarize, etc. The goal of Malhar is to enable the application developer to focus on WHAT needs to be done to the stream to get it in the right format and not worry about the HOW.  Malhar has several operators to perform the common stream manipulation actions like \u2013 GroupBy, Join, Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, Out
 er join, Select, Update etc.\n\n\nCompute\n\n\nOne of the most important promises of a streaming analytics platform like Apache Apex is the ability to do analytics in real-time. However delivering on the promise becomes really difficult when the platform does not provide out of the box operators to support variety of common compute functions as the user then has to worry about making these scalable, fault tolerant, stateful, etc.  Malhar takes this responsibility away from the application developer by providing a variety of out of the box computational operators.\n\n\nBelow is just a snapshot of the compute operators available in Malhar\n\n\n\n\nStatistics and math - Various mathematical and statistical computations over application defined time windows.\n\n\nFiltering and pattern matching\n\n\nSorting, maps, frequency, TopN, BottomN\n\n\nRandom data generators\n\n\n\n\nLanguages Support\n\n\nMigrating to a new platform often requires re-use of the existing code that would be diffic
 ult or time-consuming to re-write.  With this in mind, Malhar supports invocation of code written in other languages by wrapping them in one of the library operators, and allows execution of software written in:\n\n\n\n\nJavaScript\n\n\nPython\n\n\nR\n\n\nRuby", 
+            "title": "Apache Apex Malhar"
+        }, 
+        {
+            "location": "/#apache-apex-malhar", 
+            "text": "Apache Apex Malhar is an open source operator and codec library that can be used with the  Apache Apex  platform to build real-time streaming applications.  Enabling users to extract value quickly, Malhar operators help get data in, analyze it in real-time, and get data out of Hadoop.  In addition to the operators, the library contains a number of demos applications, demonstrating operator features and capabilities.", 
+            "title": "Apache Apex Malhar"
+        }, 
+        {
+            "location": "/#capabilities-common-across-malhar-operators", 
+            "text": "For most streaming platforms, connectors are afterthoughts and often end up being simple \u2018bolt-ons\u2019 to the platform. As a result they often cause performance issues or data loss when put through failure scenarios and scalability requirements. Malhar operators do not face these issues as they were designed to be integral parts of Apex. Hence, they have following core streaming runtime capabilities   Fault tolerance  \u2013 Malhar operators where applicable have fault tolerance built in. They use the checkpoint capability provided by the framework to ensure that there is no data loss under ANY failure scenario.  Processing guarantees  \u2013 Malhar operators where applicable provide out of the box support for ALL three processing guarantees \u2013 exactly once, at-least once, and at-most once WITHOUT requiring the user to write any additional code.  Some operators, like MQTT operator, deal with source systems that can not track processed data and hence n
 eed the operators to keep track of the data.  Malhar has support for a generic operator that uses alternate storage like HDFS to facilitate this.  Finally for databases that support transactions or support any sort of atomic batch operations Malhar operators can do exactly once down to the tuple level.  Dynamic updates  \u2013 Based on changing business conditions you often have to tweak several parameters used by the operators in your streaming application without incurring any application downtime. You can also change properties of a Malhar operator at runtime without having to bring down the application.  Ease of extensibility  \u2013 Malhar operators are based on templates that are easy to extend.  Partitioning support  \u2013 In streaming applications the input data stream often needs to be partitioned based on the contents of the stream. Also for operators that ingest data from external systems partitioning needs to be done based on the capabilities of the external system.  Fo
 r example with Kafka, the operator can automatically scale up or down based on the changes in the number of Kafka partitions.", 
+            "title": "Capabilities common across Malhar operators"
+        }, 
+        {
+            "location": "/#operator-library-overview", 
+            "text": "", 
+            "title": "Operator Library Overview"
+        }, 
+        {
+            "location": "/#inputoutput-connectors", 
+            "text": "Below is a summary of the various sub categories of input and output operators. Input operators also have a corresponding output operator   File Systems  \u2013 Most streaming analytics use cases require the data to be stored in HDFS or perhaps S3 if the application is running in AWS.  Users often need to re-run their streaming analytical applications against historical data or consume data from upstream processes that are perhaps writing to some NFS share.  Apex supports input   output operators for HDFS, S3, NFS   Local Files.  There are also File Splitter and Block Reader operators, which can accelecate processing of large files by splitting and paralellizing the work across non-overlapping sets of file blocks.  Relational Databases  \u2013 Most stream processing use cases require some reference data lookups to enrich, tag or filter streaming data. There is also a need to save results of the streaming analytical computation to a database so an operational das
 hboard can see them. Apex supports a JDBC operator so you can read/write data from any JDBC compliant RDBMS like Oracle, MySQL, Sqlite, etc.  NoSQL Databases  \u2013 NoSQL key-value pair databases like Cassandra   HBase are a common part of streaming analytics application architectures to lookup reference data or store results.  Malhar has operators for HBase, Cassandra, Accumulo, Aerospike, MongoDB, and CouchDB.  Messaging Systems  \u2013 Kafka, JMS, and similar systems are the workhorses of messaging infrastructure in most enterprises.  Malhar has a robust, industry-tested set of operators to read and write Kafka, JMS, ZeroMQ, and RabbitMQ messages.  Notification Systems  \u2013 Malhar includes an operator for sending notifications via SMTP.  In-memory Databases   Caching platforms  - Some streaming use cases need instantaneous access to shared state across the application. Caching platforms and in-memory databases serve this purpose really well. To support these use cases, Malhar
  has operators for memcached and Redis.  Social Media  - Malhar includes an operator to connect to the popular Twitter stream fire hose.  Protocols  - Malhar provides connectors that can communicate in HTTP, RSS, Socket, WebSocket, FTP, and MQTT.", 
+            "title": "Input/output connectors"
+        }, 
+        {
+            "location": "/#parsers", 
+            "text": "There are many industry vertical specific data formats that a streaming application developer might need to parse. Often there are existing parsers available for these that can be directly plugged into an Apache Apex application. For example in the Telco space, a Java based CDR parser can be directly plugged into Apache Apex operator. To further simplify development experience, Malhar also provides some operators for parsing common formats like XML (DOM   SAX), JSON (flat map converter), Apache log files, syslog, etc.", 
+            "title": "Parsers"
+        }, 
+        {
+            "location": "/#stream-manipulation", 
+            "text": "Streaming data inevitably needs processing to clean, filter, tag, summarize, etc. The goal of Malhar is to enable the application developer to focus on WHAT needs to be done to the stream to get it in the right format and not worry about the HOW.  Malhar has several operators to perform the common stream manipulation actions like \u2013 GroupBy, Join, Distinct/Unique, Limit, OrderBy, Split, Sample, Inner join, Outer join, Select, Update etc.", 
+            "title": "Stream manipulation"
+        }, 
+        {
+            "location": "/#compute", 
+            "text": "One of the most important promises of a streaming analytics platform like Apache Apex is the ability to do analytics in real-time. However delivering on the promise becomes really difficult when the platform does not provide out of the box operators to support variety of common compute functions as the user then has to worry about making these scalable, fault tolerant, stateful, etc.  Malhar takes this responsibility away from the application developer by providing a variety of out of the box computational operators.  Below is just a snapshot of the compute operators available in Malhar   Statistics and math - Various mathematical and statistical computations over application defined time windows.  Filtering and pattern matching  Sorting, maps, frequency, TopN, BottomN  Random data generators", 
+            "title": "Compute"
+        }, 
+        {
+            "location": "/#languages-support", 
+            "text": "Migrating to a new platform often requires re-use of the existing code that would be difficult or time-consuming to re-write.  With this in mind, Malhar supports invocation of code written in other languages by wrapping them in one of the library operators, and allows execution of software written in:   JavaScript  Python  R  Ruby", 
+            "title": "Languages Support"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/", 
+            "text": "KAFKA INPUT OPERATOR\n\n\nIntroduction: About Kafka Input Operator\n\n\nThis is an input operator that consumes data from Kafka messaging system for further processing in Apex. Kafka Input Operator is an fault-tolerant and scalable Malhar Operator.\n\n\nWhy is it needed ?\n\n\nKafka is a pull-based and distributed publish subscribe messaging system, topics are partitioned and replicated across\nnodes. Kafka input operator is needed when you want to read data from multiple\npartitions of a Kafka topic in parallel in an Apex application.\n\n\nAbstractKafkaInputOperator\n\n\nThis is the abstract implementation that serves as base class for consuming messages from Kafka messaging system. This class doesn\u2019t have any ports.\n\n\n\n\nConfiguration Parameters\n\n\n\n\n\n\n\n\n\n\n\n\nParameter\n\n\nDescription\n\n\n\n\n\n\nmaxTuplesPerWindow\n\n\nControls the maximum number of messages emitted in each streaming window from this operator. Minimum value is 1. Default
  value = MAX_VALUE \n\n\n\n\n\n\nidempotentStorageManager\n\n\nThis is an instance of IdempotentStorageManager. Idempotency ensures that the operator will process the same set of messages in a window before and after a failure. For example, let's say the operator completed window 10 and failed somewhere between window 11. If the operator gets restored at window 10 then it will process the same messages again in window 10 which it did in the previous run before the failure. Idempotency is important but comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. Default Value = com.datatorrent.lib.io.IdempotentStorageManager.\nNoopIdempotentStorageManager\n\n\n\n\n\n\nstrategy\n\n\nOperator supports two types of partitioning strategies, ONE_TO_ONE and ONE_TO_MANY.\n\n\nONE_TO_ONE: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equ
 als the number of operator instances.\n\n\nONE_TO_MANY: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances.\nDefault Value = ONE_TO_ONE\n\n\n\n\n\n\nmsgRateUpperBound\n\n\nMaximum messages upper bound. Operator repartitions when the \nmsgProcessedPS\n exceeds this bound. \nmsgProcessedPS\n is the average number of messages processed per second by this operator.\n\n\n\n\n\n\nbyteRateUpperBound\n\n\nMaximum bytes upper bound. Operator repartitions when the \nbytesPS\n exceeds this bound. \nbytesPS\n is the average number of bytes process
 ed per second by this operator.\n\n\n\n\n\n\n\n\noffsetManager\n\n\nThis is an optional parameter that is useful when the application restarts or start at specific offsets (offsets are explained below)\n\n\n\n\n\n\nrepartitionInterval\n\n\nInterval specified in milliseconds. This value specifies the minimum time required between two repartition actions. Default Value = 30 Seconds\n\n\n\n\n\n\nrepartitionCheckInterval\n\n\nInterval specified in milliseconds. This value specifies the minimum interval between two offset updates. Default Value = 5 Seconds\n\n\n\n\n\n\ninitialPartitionCount\n\n\nWhen the ONE_TO_MANY partition strategy is enabled, this value indicates the number of Kafka input operator instances. Default Value = 1\n\n\n\n\n\n\nconsumer\n\n\nThis is an instance of com.datatorrent.contrib.kafka.KafkaConsumer. Default Value = Instance of SimpleKafkaConsumer.\n\n\n\n\n\n\n\n\nAbstract Methods\n\n\nvoid emitTuple(Message message): Abstract method that emits tuples\nextracted f
 rom Kafka message.\n\n\nKafkaConsumer\n\n\nThis is an abstract implementation of Kafka consumer. It sends the fetch\nrequests to the leading brokers of Kafka partitions. For each request,\nit receives the set of messages and stores them into the buffer which is\nArrayBlockingQueue. SimpleKafkaConsumer\u00a0which extends\nKafkaConsumer and serves the functionality of Simple Consumer API and\nHighLevelKafkaConsumer which extends KafkaConsumer and \u00a0serves the\nfunctionality of High Level Consumer API.\n\n\nPre-requisites\n\n\nThis operator referred the Kafka Consumer API of version\n0.8.1.1. So, this operator will work with any 0.8.x and 0.7.x version of Apache Kafka.\n\n\nConfiguration Parameters\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nParameter\n\n\nType\n\n\nDefault\n\n\nDescription\n\n\n\n\n\n\nzookeeper\n\n\nString\n\n\n\n\nSpecifies the zookeeper quorum of Kafka clusters that you want to consume messages from. zookeeper \u00a0is a string in the form of hostname1:port1,hostname2:por
 t2,hostname3:port3 \u00a0where hostname1,hostname2,hostname3 are hosts and port1,port2,port3 are ports of zookeeper server. \u00a0If the topic name is the same across the Kafka clusters and want to consume data from these clusters, then configure the zookeeper as follows: c1::hs1:p1,hs2:p2,hs3:p3;c2::hs4:p4,hs5:p5,c3::hs6:p6\n\n\nwhere\n\n\nc1,c2,c3 indicates the cluster names, hs1,hs2,hs3,hs4,hs5,hs6 are zookeeper hosts and p1,p2,p3,p4,p5,p6 are corresponding ports. Here, cluster name is optional in case of single cluster\n\n\n\n\n\n\ncacheSize\n\n\nint\n\n\n1024\n\n\nMaximum of buffered messages hold in memory.\n\n\n\n\n\n\ntopic\n\n\nString\n\n\ndefault_topic\n\n\nIndicates the name of the topic.\n\n\n\n\n\n\ninitialOffset\n\n\nString\n\n\nlatest\n\n\nIndicates the type of offset i.e, \u201cearliest or latest\u201d. If initialOffset is \u201clatest\u201d, then the operator consumes messages from latest point of Kafka queue. If initialOffset is \u201cearliest\u201d, then the opera
 tor consumes messages starting from message queue. This can be overridden by OffsetManager.\n\n\n\n\n\n\n\n\n\nAbstract Methods\n\n\n\n\nvoid commitOffset(): Commit the offsets at checkpoint.\n\n\nMap \nKafkaPartition, Long\n getCurrentOffsets(): Return the current\n    offset status.\n\n\nresetPartitionsAndOffset(Set \nKafkaPartition\n partitionIds,\n    Map \nKafkaPartition, Long\n startOffset): Reset the partitions with\n    parittionIds and offsets with startOffset.\n\n\n\n\nConfiguration Parameters\u00a0for SimpleKafkaConsumer\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\nParameter\n\n\nType\n\n\nDefault\n\n\nDescription\n\n\n\n\n\n\nbufferSize\n\n\nint\n\n\n1 MB\n\n\nSpecifies the maximum total size of messages for each fetch request.\n\n\n\n\n\n\nmetadataRefreshInterval\n\n\nint\n\n\n30 Seconds\n\n\nInterval in between refresh the metadata change(broker change) in milliseconds. Enabling metadata refresh guarantees an automatic reconnect when a new broker is elected as the host. A value of
  -1 disables this feature.\n\n\n\n\n\n\nmetadataRefreshRetryLimit\n\n\nint\n\n\n-1\n\n\nSpecifies the maximum brokers' metadata refresh retry limit. -1 means unlimited retry.\n\n\n\n\n\n\n\n\n\nOffsetManager\n\n\nThis is an interface for offset management and is useful when consuming data\nfrom specified offsets. Updates the offsets for all the Kafka partitions\nperiodically. Below is the code snippet:\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\n\n\npublic interface OffsetManager\n{\n  public Map\nKafkaPartition, Long\n loadInitialOffsets();\n  public void updateOffsets(Map\nKafkaPartition, Long\n offsetsOfPartitions);\n}\n\n\n\n\nAbstract Methods\n\n\nMap \nKafkaPartition, Long\n loadInitialOffsets(): Specifies the initial offset for consuming messages; called at the activation stage.\n\n\nupdateOffsets(Map \nKafkaPartition, Long\n offsetsOfPartitions): \u00a0This\nmethod is called at every repartitionCheckInterval to update offsets.\n\n\nPartitioning\n\n\nThe logical instance
  of the KafkaInputOperator acts as the Partitioner\nas well as a StatsListener. This is because the\nAbstractKafkaInputOperator implements both the\ncom.datatorrent.api.Partitioner and com.datatorrent.api.StatsListener\ninterfaces and provides an implementation of definePartitions(...) and\nprocessStats(...) which makes it auto-scalable.\n\n\nResponse processStats(BatchedOperatorStats stats)\n\n\nThe application master invokes this method on the logical instance with\nthe stats (tuplesProcessedPS, bytesPS, etc.) of each partition.\nRe-partitioning happens based on whether any new Kafka partitions added for\nthe topic or bytesPS and msgPS cross their respective upper bounds.\n\n\nDefinePartitions\n\n\nBased on the repartitionRequired field of the Response object which is\nreturned by processStats(...) method, the application master invokes\ndefinePartitions(...) on the logical instance which is also the\npartitioner instance. Dynamic partition can be disabled by setting the\nparamete
 r repartitionInterval value to a negative value.\n\n\nAbstractSinglePortKafkaInputOperator\n\n\nThis class extends AbstractKafkaInputOperator and having single output\nport, will emit the messages through this port.\n\n\nPorts\n\n\noutputPort \nT\n: Tuples extracted from Kafka messages are emitted through\nthis port.\n\n\nAbstract Methods\n\n\nT getTuple(Message msg) : Converts the Kafka message to tuple.\n\n\nConcrete Classes\n\n\n\n\n\n\nKafkaSinglePortStringInputOperator :\nThis class extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts string from Kafka message.\n\n\n\n\n\n\nKafkaSinglePortByteArrayInputOperator:\nThis class extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts byte array from Kafka message.\n\n\n\n\n\n\nApplication Example\n\n\nThis section builds an Apex application using Kafka input operator.\nBelow is the code snippet:\n\n\n@ApplicationAnnotation(name = \nKafkaApp\n)\npublic class ExampleKafkaApplication implement
 s StreamingApplication\n{\n@Override\npublic void populateDAG(DAG dag, Configuration entries)\n{\n  KafkaSinglePortByteArrayInputOperator input =  dag.addOperator(\nMessageReader\n, new KafkaSinglePortByteArrayInputOperator());\n\n  ConsoleOutputOperator output = dag.addOperator(\nOutput\n, new ConsoleOutputOperator());\n\n  dag.addStream(\nMessageData\n, input.outputPort, output.input);\n}\n}\n\n\n\n\nBelow is the configuration for \u201ctest\u201d Kafka topic name and\n\u201clocalhost:2181\u201d is the zookeeper forum:\n\n\nproperty\n\n\nname\ndt.operator.MessageReader.prop.topic\n/name\n\n\nvalue\ntest\n/value\n\n\n/property\n\n\n\nproperty\n\n\nname\ndt.operator.KafkaInputOperator.prop.zookeeper\n/nam\n\n\nvalue\nlocalhost:2181\n/value\n\n\n/property", 
+            "title": "Kafka Input"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#kafka-input-operator", 
+            "text": "", 
+            "title": "KAFKA INPUT OPERATOR"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#introduction-about-kafka-input-operator", 
+            "text": "This is an input operator that consumes data from Kafka messaging system for further processing in Apex. Kafka Input Operator is an fault-tolerant and scalable Malhar Operator.", 
+            "title": "Introduction: About Kafka Input Operator"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#why-is-it-needed", 
+            "text": "Kafka is a pull-based and distributed publish subscribe messaging system, topics are partitioned and replicated across\nnodes. Kafka input operator is needed when you want to read data from multiple\npartitions of a Kafka topic in parallel in an Apex application.", 
+            "title": "Why is it needed ?"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstractkafkainputoperator", 
+            "text": "This is the abstract implementation that serves as base class for consuming messages from Kafka messaging system. This class doesn\u2019t have any ports.", 
+            "title": "AbstractKafkaInputOperator"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#configuration-parameters", 
+            "text": "Parameter  Description    maxTuplesPerWindow  Controls the maximum number of messages emitted in each streaming window from this operator. Minimum value is 1. Default value = MAX_VALUE     idempotentStorageManager  This is an instance of IdempotentStorageManager. Idempotency ensures that the operator will process the same set of messages in a window before and after a failure. For example, let's say the operator completed window 10 and failed somewhere between window 11. If the operator gets restored at window 10 then it will process the same messages again in window 10 which it did in the previous run before the failure. Idempotency is important but comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. Default Value = com.datatorrent.lib.io.IdempotentStorageManager. NoopIdempotentStorageManager    strategy  Operator supports two types of partitioning strategies, ONE_TO_ONE and ONE_TO_MANY.
   ONE_TO_ONE: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances.  ONE_TO_MANY: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances.\nDefault Value = ONE_TO_ONE    msgRateUpperBound  Maximum messages upper bound. Operator repartitions when the  msgProcessedPS  exceeds this bound.  msgProcessedPS  is the average number of messages processed per second by this operator.    byteRateUpperBound  Maximum bytes upper bo
 und. Operator repartitions when the  bytesPS  exceeds this bound.  bytesPS  is the average number of bytes processed per second by this operator.     offsetManager  This is an optional parameter that is useful when the application restarts or start at specific offsets (offsets are explained below)    repartitionInterval  Interval specified in milliseconds. This value specifies the minimum time required between two repartition actions. Default Value = 30 Seconds    repartitionCheckInterval  Interval specified in milliseconds. This value specifies the minimum interval between two offset updates. Default Value = 5 Seconds    initialPartitionCount  When the ONE_TO_MANY partition strategy is enabled, this value indicates the number of Kafka input operator instances. Default Value = 1    consumer  This is an instance of com.datatorrent.contrib.kafka.KafkaConsumer. Default Value = Instance of SimpleKafkaConsumer.", 
+            "title": "Configuration Parameters"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods", 
+            "text": "void emitTuple(Message message): Abstract method that emits tuples\nextracted from Kafka message.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#kafkaconsumer", 
+            "text": "This is an abstract implementation of Kafka consumer. It sends the fetch\nrequests to the leading brokers of Kafka partitions. For each request,\nit receives the set of messages and stores them into the buffer which is\nArrayBlockingQueue. SimpleKafkaConsumer\u00a0which extends\nKafkaConsumer and serves the functionality of Simple Consumer API and\nHighLevelKafkaConsumer which extends KafkaConsumer and \u00a0serves the\nfunctionality of High Level Consumer API.", 
+            "title": "KafkaConsumer"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#pre-requisites", 
+            "text": "This operator referred the Kafka Consumer API of version\n0.8.1.1. So, this operator will work with any 0.8.x and 0.7.x version of Apache Kafka.", 
+            "title": "Pre-requisites"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#configuration-parameters_1", 
+            "text": "Parameter  Type  Default  Description    zookeeper  String   Specifies the zookeeper quorum of Kafka clusters that you want to consume messages from. zookeeper \u00a0is a string in the form of hostname1:port1,hostname2:port2,hostname3:port3 \u00a0where hostname1,hostname2,hostname3 are hosts and port1,port2,port3 are ports of zookeeper server. \u00a0If the topic name is the same across the Kafka clusters and want to consume data from these clusters, then configure the zookeeper as follows: c1::hs1:p1,hs2:p2,hs3:p3;c2::hs4:p4,hs5:p5,c3::hs6:p6  where  c1,c2,c3 indicates the cluster names, hs1,hs2,hs3,hs4,hs5,hs6 are zookeeper hosts and p1,p2,p3,p4,p5,p6 are corresponding ports. Here, cluster name is optional in case of single cluster    cacheSize  int  1024  Maximum of buffered messages hold in memory.    topic  String  default_topic  Indicates the name of the topic.    initialOffset  String  latest  Indicates the type of offset i.e, \u201cearliest or latest\u201
 d. If initialOffset is \u201clatest\u201d, then the operator consumes messages from latest point of Kafka queue. If initialOffset is \u201cearliest\u201d, then the operator consumes messages starting from message queue. This can be overridden by OffsetManager.", 
+            "title": "Configuration Parameters"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods_1", 
+            "text": "void commitOffset(): Commit the offsets at checkpoint.  Map  KafkaPartition, Long  getCurrentOffsets(): Return the current\n    offset status.  resetPartitionsAndOffset(Set  KafkaPartition  partitionIds,\n    Map  KafkaPartition, Long  startOffset): Reset the partitions with\n    parittionIds and offsets with startOffset.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#configuration-parameters-for-simplekafkaconsumer", 
+            "text": "Parameter  Type  Default  Description    bufferSize  int  1 MB  Specifies the maximum total size of messages for each fetch request.    metadataRefreshInterval  int  30 Seconds  Interval in between refresh the metadata change(broker change) in milliseconds. Enabling metadata refresh guarantees an automatic reconnect when a new broker is elected as the host. A value of -1 disables this feature.    metadataRefreshRetryLimit  int  -1  Specifies the maximum brokers' metadata refresh retry limit. -1 means unlimited retry.", 
+            "title": "Configuration Parameters\u00a0for SimpleKafkaConsumer"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#offsetmanager", 
+            "text": "This is an interface for offset management and is useful when consuming data\nfrom specified offsets. Updates the offsets for all the Kafka partitions\nperiodically. Below is the code snippet:\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0  public interface OffsetManager\n{\n  public Map KafkaPartition, Long  loadInitialOffsets();\n  public void updateOffsets(Map KafkaPartition, Long  offsetsOfPartitions);\n}", 
+            "title": "OffsetManager"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods_2", 
+            "text": "Map  KafkaPartition, Long  loadInitialOffsets(): Specifies the initial offset for consuming messages; called at the activation stage.  updateOffsets(Map  KafkaPartition, Long  offsetsOfPartitions): \u00a0This\nmethod is called at every repartitionCheckInterval to update offsets.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#partitioning", 
+            "text": "The logical instance of the KafkaInputOperator acts as the Partitioner\nas well as a StatsListener. This is because the\nAbstractKafkaInputOperator implements both the\ncom.datatorrent.api.Partitioner and com.datatorrent.api.StatsListener\ninterfaces and provides an implementation of definePartitions(...) and\nprocessStats(...) which makes it auto-scalable.", 
+            "title": "Partitioning"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#response-processstatsbatchedoperatorstats-stats", 
+            "text": "The application master invokes this method on the logical instance with\nthe stats (tuplesProcessedPS, bytesPS, etc.) of each partition.\nRe-partitioning happens based on whether any new Kafka partitions added for\nthe topic or bytesPS and msgPS cross their respective upper bounds.", 
+            "title": "Response processStats(BatchedOperatorStats stats)"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#definepartitions", 
+            "text": "Based on the repartitionRequired field of the Response object which is\nreturned by processStats(...) method, the application master invokes\ndefinePartitions(...) on the logical instance which is also the\npartitioner instance. Dynamic partition can be disabled by setting the\nparameter repartitionInterval value to a negative value.", 
+            "title": "DefinePartitions"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstractsingleportkafkainputoperator", 
+            "text": "This class extends AbstractKafkaInputOperator and having single output\nport, will emit the messages through this port.", 
+            "title": "AbstractSinglePortKafkaInputOperator"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#ports", 
+            "text": "outputPort  T : Tuples extracted from Kafka messages are emitted through\nthis port.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#abstract-methods_3", 
+            "text": "T getTuple(Message msg) : Converts the Kafka message to tuple.", 
+            "title": "Abstract Methods"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#concrete-classes", 
+            "text": "KafkaSinglePortStringInputOperator :\nThis class extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts string from Kafka message.    KafkaSinglePortByteArrayInputOperator:\nThis class extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts byte array from Kafka message.", 
+            "title": "Concrete Classes"
+        }, 
+        {
+            "location": "/operators/kafkaInputOperator/#application-example", 
+            "text": "This section builds an Apex application using Kafka input operator.\nBelow is the code snippet:  @ApplicationAnnotation(name =  KafkaApp )\npublic class ExampleKafkaApplication implements StreamingApplication\n{\n@Override\npublic void populateDAG(DAG dag, Configuration entries)\n{\n  KafkaSinglePortByteArrayInputOperator input =  dag.addOperator( MessageReader , new KafkaSinglePortByteArrayInputOperator());\n\n  ConsoleOutputOperator output = dag.addOperator( Output , new ConsoleOutputOperator());\n\n  dag.addStream( MessageData , input.outputPort, output.input);\n}\n}  Below is the configuration for \u201ctest\u201d Kafka topic name and\n\u201clocalhost:2181\u201d is the zookeeper forum:  property  name dt.operator.MessageReader.prop.topic /name  value test /value  /property  property  name dt.operator.KafkaInputOperator.prop.zookeeper /nam  value localhost:2181 /value  /property", 
+            "title": "Application Example"
+        }, 
+        {
+            "location": "/operators/file_splitter/", 
+            "text": "File Splitter\n\n\nThis is a simple operator whose main function is to split a file virtually and create metadata describing the files and the splits. \n\n\nWhy is it needed?\n\n\nIt is a common operation to read a file and parse it. This operation can be parallelized by having multiple partitions of such operators and each partition operating on different files. However, at times when a file is large then a single partition reading it can become a bottleneck.\nIn these cases, throughput can be increased if instances of the partitioned operator can read and parse non-overlapping sets of file blocks. This is where file splitter comes in handy. It creates metadata of blocks of file which serves as tasks handed out to downstream operator partitions. \nThe downstream partitions can read/parse the block without the need of interacting with other partitions.\n\n\nClass Diagram\n\n\n\n\nAbstractFileSplitter\n\n\nThe abstract implementation defines the logic of processi
 ng \nFileInfo\n. This comprises the following tasks -  \n\n\n\n\n\n\nbuilding \nFileMetadata\n per file and emitting it. This metadata contains the file information such as filepath, no. of blocks in it, length of the file, all the block ids, etc.\n\n\n\n\n\n\ncreating \nBlockMetadataIterator\n from \nFileMetadata\n. The iterator lazy-loads the block metadata when needed. We use an iterator because the no. of blocks in a file can be huge if the block size is small and loading all of them at once in memory may cause out of memory errors.\n\n\n\n\n\n\nretrieving \nBlockMetadata.FileBlockMetadata\n from the block metadata iterator and emitting it. The FileBlockMetadata contains the block id, start offset of the block, length of file in the block, etc. The number of block metadata emitted per window are controlled by \nblocksThreshold\n setting which by default is 1.  \n\n\n\n\n\n\nThe main utility method that performs all the above tasks is the \nprocess()\n method. Concrete implementa
 tions can invoke this method whenever they have data to process.\n\n\nPorts\n\n\nDeclares only output ports on which file metadata and block metadata are emitted.\n\n\n\n\nfilesMetadataOutput: metadata for each file is emitted on this port. \n\n\nblocksMetadataOutput: metadata for each block is emitted on this port. \n\n\n\n\nprocess()\n method\n\n\nWhen process() is invoked, any pending blocks from the current file are emitted on the 'blocksMetadataOutput' port. If the threshold for blocks per window is still not met then a new input file is processed - corresponding metadata is emitted on 'filesMetadataOutput' and more of its blocks are emitted. This operation is repeated until the \nblocksThreshold\n is reached or there are no more new files.\n\n\n  protected void process()\n  {\n    if (blockMetadataIterator != null \n blockCount \n blocksThreshold) {\n      emitBlockMetadata();\n    }\n\n    FileInfo fileInfo;\n    while (blockCount \n blocksThreshold \n (fileInfo = getFileInfo
 ()) != null) {\n      if (!processFileInfo(fileInfo)) {\n        break;\n      }\n    }\n  }\n\n\n\n\nAbstract methods\n\n\n\n\n\n\nFileInfo getFileInfo()\n: called from within the \nprocess()\n and provides the next file to process.\n\n\n\n\n\n\nlong getDefaultBlockSize()\n: provides the block size which is used when user hasn't configured the size.\n\n\n\n\n\n\nFileStatus getFileStatus(Path path)\n: provides the \norg.apache.hadoop.fs.FileStatus\n instance for a path.   \n\n\n\n\n\n\nConfiguration\n\n\n\n\nblockSize\n: size of a block.\n\n\nblocksThreshold\n: threshold on the number of blocks emitted by file splitter every window. This setting is used for throttling the work for downstream operators.\n\n\n\n\nFileSplitterBase\n\n\nSimple operator that receives tuples of type \nFileInfo\n on its \ninput\n port. \nFileInfo\n contains the information (currently just the file path) about the file which this operator uses to create file metadata and block metadata.\n\n\nExample applica
 tion\n\n\nThis is a simple sub-dag that demonstrates how FileSplitterBase can be plugged into an application.\n\n\n\nThe upstream operator emits tuples of type \nFileInfo\n on its output port which is connected to splitter input port. The downstream receives tuples of type \nBlockMetadata.FileBlockMetadata\n from the splitter's block metadata output port.\n\n\npublic class ApplicationWithBaseSplitter implements StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, Configuration configuration)\n  {\n    JMSInput input = dag.addOperator(\nInput\n, new JMSInput());\n    FileSplitterBase splitter = dag.addOperator(\nSplitter\n, new FileSplitterBase());\n    FSSliceReader blockReader = dag.addOperator(\nBlockReader\n, new FSSliceReader());\n    ...\n    dag.addStream(\nfile-info\n, input.output, splitter.input);\n    dag.addStream(\nblock-metadata\n, splitter.blocksMetadataOutput, blockReader.blocksMetadataInput);\n    ...\n  }\n\n  public static class JMSInput extends
  AbstractJMSInputOperator\nAbstractFileSplitter.FileInfo\n\n  {\n\n    public final transient DefaultOutputPort\nAbstractFileSplitter.FileInfo\n output = new DefaultOutputPort\n();\n\n    @Override\n    protected AbstractFileSplitter.FileInfo convert(Message message) throws JMSException\n    {\n      //assuming the message is a text message containing the absolute path of the file.\n      return new AbstractFileSplitter.FileInfo(null, ((TextMessage)message).getText());\n    }\n\n    @Override\n    protected void emit(AbstractFileSplitter.FileInfo payload)\n    {\n      output.emit(payload);\n    }\n  }\n}\n\n\n\n\nPorts\n\n\nDeclares an input port on which it receives tuples from the upstream operator. Output ports are inherited from AbstractFileSplitter.\n\n\n\n\ninput: non optional port on which tuples of type \nFileInfo\n are received.\n\n\n\n\nConfiguration\n\n\n\n\nfile\n: path of the file from which the filesystem is inferred. FileSplitter creates an instance of \norg.apache.h
 adoop.fs.FileSystem\n which is why this path is needed.  \n\n\n\n\nFileSystem.newInstance(new Path(file).toUri(), new Configuration());\n\n\n\n\nThe fs instance is then used to fetch the default block size and \norg.apache.hadoop.fs.FileStatus\n for each file path.\n\n\nFileSplitterInput\n\n\nThis is an input operator that discovers files itself. The scanning of the directories for new files is asynchronous which is handled by \nTimeBasedDirectoryScanner\n. The function of TimeBasedDirectoryScanner is to periodically scan specified directories and find files which were newly added or modified. The interaction between the operator and the scanner is depicted in the diagram below.\n\n\n\n\nExample application\n\n\nThis is a simple sub-dag that demonstrates how FileSplitterInput can be plugged into an application.\n\n\n\n\nSplitter is the input operator here that sends block metadata to the downstream BlockReader.\n\n\n  @Override\n  public void populateDAG(DAG dag, Configuration confi
 guration)\n  {\n    FileSplitterInput input = dag.addOperator(\nInput\n, new FileSplitterInput());\n    FSSliceReader reader = dag.addOperator(\nBlock Reader\n, new FSSliceReader());\n    ...\n    dag.addStream(\nblock-metadata\n, input.blocksMetadataOutput, reader.blocksMetadataInput);\n    ...\n  }\n\n\n\n\n\nPorts\n\n\nSince it is an input operator there are no input ports and output ports are inherited from AbstractFileSplitter.\n\n\nConfiguration\n\n\n\n\nscanner\n: the component that scans directories asynchronously. It is of type \ncom.datatorrent.lib.io.fs.FileSplitter.TimeBasedDirectoryScanner\n. The basic implementation of TimeBasedDirectoryScanner can be customized by users.  \n\n\n\n\na. \nfiles\n: comma separated list of directories to scan.  \n\n\nb. \nrecursive\n: flag that controls whether the directories should be scanned recursively.  \n\n\nc. \nscanIntervalMillis\n: interval specified in milliseconds after which another scan iteration is triggered.  \n\n\nd. \nfil
 ePatternRegularExp\n: regular expression for accepted file names.  \n\n\ne. \ntrigger\n: a flag that triggers a scan iteration instantly. If the scanner thread is idling then it will initiate a scan immediately otherwise if a scan is in progress, then the new iteration will be triggered immediately after the completion of current one.\n2. \nidempotentStorageManager\n: by default FileSplitterInput is idempotent. \nIdempotency ensures that the operator will process the same set of files/blocks in a window if it has seen that window previously, i.e., before a failure. For example, let's say the operator completed window 10 and failed somewhere between window 11. If the operator gets restored at window 10 then it will process the same file/block again in window 10 which it did in the previous run before the failure. Idempotency is important but comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. Therefore, if one
  doesn't care about idempotency then they can set this property to be an instance of \ncom.datatorrent.lib.io.IdempotentStorageManager.NoopIdempotentStorageManager\n.\n\n\nHandling of split records\n\n\nSplitting of files to create tasks for downstream operator needs to be a simple operation that doesn't consume a lot of resources and is fast. This is why the file splitter doesn't open files to read. The downside of that is if the file contains records then a record may split across adjacent blocks. Handling of this is left to the downstream operator.\n\n\nWe have created Block readers in Apex-malhar library that handle line splits efficiently. The 2 line readers- \nAbstractFSLineReader\n and \nAbstractFSReadAheadLineReader\n can be found here \nAbstractFSBlockReader\n.", 
+            "title": "File Splitter"
+        }, 
+        {
+            "location": "/operators/file_splitter/#file-splitter", 
+            "text": "This is a simple operator whose main function is to split a file virtually and create metadata describing the files and the splits.", 
+            "title": "File Splitter"
+        }, 
+        {
+            "location": "/operators/file_splitter/#why-is-it-needed", 
+            "text": "It is a common operation to read a file and parse it. This operation can be parallelized by having multiple partitions of such operators and each partition operating on different files. However, at times when a file is large then a single partition reading it can become a bottleneck.\nIn these cases, throughput can be increased if instances of the partitioned operator can read and parse non-overlapping sets of file blocks. This is where file splitter comes in handy. It creates metadata of blocks of file which serves as tasks handed out to downstream operator partitions. \nThe downstream partitions can read/parse the block without the need of interacting with other partitions.", 
+            "title": "Why is it needed?"
+        }, 
+        {
+            "location": "/operators/file_splitter/#class-diagram", 
+            "text": "", 
+            "title": "Class Diagram"
+        }, 
+        {
+            "location": "/operators/file_splitter/#abstractfilesplitter", 
+            "text": "The abstract implementation defines the logic of processing  FileInfo . This comprises the following tasks -      building  FileMetadata  per file and emitting it. This metadata contains the file information such as filepath, no. of blocks in it, length of the file, all the block ids, etc.    creating  BlockMetadataIterator  from  FileMetadata . The iterator lazy-loads the block metadata when needed. We use an iterator because the no. of blocks in a file can be huge if the block size is small and loading all of them at once in memory may cause out of memory errors.    retrieving  BlockMetadata.FileBlockMetadata  from the block metadata iterator and emitting it. The FileBlockMetadata contains the block id, start offset of the block, length of file in the block, etc. The number of block metadata emitted per window are controlled by  blocksThreshold  setting which by default is 1.      The main utility method that performs all the above tasks is the  process()  met
 hod. Concrete implementations can invoke this method whenever they have data to process.", 
+            "title": "AbstractFileSplitter"
+        }, 
+        {
+            "location": "/operators/file_splitter/#ports", 
+            "text": "Declares only output ports on which file metadata and block metadata are emitted.   filesMetadataOutput: metadata for each file is emitted on this port.   blocksMetadataOutput: metadata for each block is emitted on this port.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/file_splitter/#abstract-methods", 
+            "text": "FileInfo getFileInfo() : called from within the  process()  and provides the next file to process.    long getDefaultBlockSize() : provides the block size which is used when user hasn't configured the size.    FileStatus getFileStatus(Path path) : provides the  org.apache.hadoop.fs.FileStatus  instance for a path.", 
+            "title": "Abstract methods"
+        }, 
+        {
+            "location": "/operators/file_splitter/#configuration", 
+            "text": "blockSize : size of a block.  blocksThreshold : threshold on the number of blocks emitted by file splitter every window. This setting is used for throttling the work for downstream operators.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": "/operators/file_splitter/#filesplitterbase", 
+            "text": "Simple operator that receives tuples of type  FileInfo  on its  input  port.  FileInfo  contains the information (currently just the file path) about the file which this operator uses to create file metadata and block metadata.", 
+            "title": "FileSplitterBase"
+        }, 
+        {
+            "location": "/operators/file_splitter/#example-application", 
+            "text": "This is a simple sub-dag that demonstrates how FileSplitterBase can be plugged into an application.  The upstream operator emits tuples of type  FileInfo  on its output port which is connected to splitter input port. The downstream receives tuples of type  BlockMetadata.FileBlockMetadata  from the splitter's block metadata output port.  public class ApplicationWithBaseSplitter implements StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, Configuration configuration)\n  {\n    JMSInput input = dag.addOperator( Input , new JMSInput());\n    FileSplitterBase splitter = dag.addOperator( Splitter , new FileSplitterBase());\n    FSSliceReader blockReader = dag.addOperator( BlockReader , new FSSliceReader());\n    ...\n    dag.addStream( file-info , input.output, splitter.input);\n    dag.addStream( block-metadata , splitter.blocksMetadataOutput, blockReader.blocksMetadataInput);\n    ...\n  }\n\n  public static class JMSInput extends AbstractJMSI
 nputOperator AbstractFileSplitter.FileInfo \n  {\n\n    public final transient DefaultOutputPort AbstractFileSplitter.FileInfo  output = new DefaultOutputPort ();\n\n    @Override\n    protected AbstractFileSplitter.FileInfo convert(Message message) throws JMSException\n    {\n      //assuming the message is a text message containing the absolute path of the file.\n      return new AbstractFileSplitter.FileInfo(null, ((TextMessage)message).getText());\n    }\n\n    @Override\n    protected void emit(AbstractFileSplitter.FileInfo payload)\n    {\n      output.emit(payload);\n    }\n  }\n}", 
+            "title": "Example application"
+        }, 
+        {
+            "location": "/operators/file_splitter/#ports_1", 
+            "text": "Declares an input port on which it receives tuples from the upstream operator. Output ports are inherited from AbstractFileSplitter.   input: non optional port on which tuples of type  FileInfo  are received.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/file_splitter/#configuration_1", 
+            "text": "file : path of the file from which the filesystem is inferred. FileSplitter creates an instance of  org.apache.hadoop.fs.FileSystem  which is why this path is needed.     FileSystem.newInstance(new Path(file).toUri(), new Configuration());  The fs instance is then used to fetch the default block size and  org.apache.hadoop.fs.FileStatus  for each file path.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": "/operators/file_splitter/#filesplitterinput", 
+            "text": "This is an input operator that discovers files itself. The scanning of the directories for new files is asynchronous which is handled by  TimeBasedDirectoryScanner . The function of TimeBasedDirectoryScanner is to periodically scan specified directories and find files which were newly added or modified. The interaction between the operator and the scanner is depicted in the diagram below.", 
+            "title": "FileSplitterInput"
+        }, 
+        {
+            "location": "/operators/file_splitter/#example-application_1", 
+            "text": "This is a simple sub-dag that demonstrates how FileSplitterInput can be plugged into an application.   Splitter is the input operator here that sends block metadata to the downstream BlockReader.    @Override\n  public void populateDAG(DAG dag, Configuration configuration)\n  {\n    FileSplitterInput input = dag.addOperator( Input , new FileSplitterInput());\n    FSSliceReader reader = dag.addOperator( Block Reader , new FSSliceReader());\n    ...\n    dag.addStream( block-metadata , input.blocksMetadataOutput, reader.blocksMetadataInput);\n    ...\n  }", 
+            "title": "Example application"
+        }, 
+        {
+            "location": "/operators/file_splitter/#ports_2", 
+            "text": "Since it is an input operator there are no input ports and output ports are inherited from AbstractFileSplitter.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/file_splitter/#configuration_2", 
+            "text": "scanner : the component that scans directories asynchronously. It is of type  com.datatorrent.lib.io.fs.FileSplitter.TimeBasedDirectoryScanner . The basic implementation of TimeBasedDirectoryScanner can be customized by users.     a.  files : comma separated list of directories to scan.    b.  recursive : flag that controls whether the directories should be scanned recursively.    c.  scanIntervalMillis : interval specified in milliseconds after which another scan iteration is triggered.    d.  filePatternRegularExp : regular expression for accepted file names.    e.  trigger : a flag that triggers a scan iteration instantly. If the scanner thread is idling then it will initiate a scan immediately otherwise if a scan is in progress, then the new iteration will be triggered immediately after the completion of current one.\n2.  idempotentStorageManager : by default FileSplitterInput is idempotent. \nIdempotency ensures that the operator will process the same set o
 f files/blocks in a window if it has seen that window previously, i.e., before a failure. For example, let's say the operator completed window 10 and failed somewhere between window 11. If the operator gets restored at window 10 then it will process the same file/block again in window 10 which it did in the previous run before the failure. Idempotency is important but comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. Therefore, if one doesn't care about idempotency then they can set this property to be an instance of  com.datatorrent.lib.io.IdempotentStorageManager.NoopIdempotentStorageManager .", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": "/operators/file_splitter/#handling-of-split-records", 
+            "text": "Splitting of files to create tasks for downstream operator needs to be a simple operation that doesn't consume a lot of resources and is fast. This is why the file splitter doesn't open files to read. The downside of that is if the file contains records then a record may split across adjacent blocks. Handling of this is left to the downstream operator.  We have created Block readers in Apex-malhar library that handle line splits efficiently. The 2 line readers-  AbstractFSLineReader  and  AbstractFSReadAheadLineReader  can be found here  AbstractFSBlockReader .", 
+            "title": "Handling of split records"
+        }, 
+        {
+            "location": "/operators/block_reader/", 
+            "text": "Block Reader\n\n\nThis is a scalable operator that reads and parses blocks of data sources into records. A data source can be a file or a message bus that contains records and a block defines a chunk of data in the source by specifying the block offset and the length of the source belonging to the block. \n\n\nWhy is it needed?\n\n\nA Block Reader is needed to parallelize reading and parsing of a single data source, for example a file. Simple parallelism of reading data sources can be achieved by multiple partitions reading different source of same type (for files see \nAbstractFileInputOperator\n) but Block Reader partitions can read blocks of same source in parallel and parse them for records ensuring that no record is duplicated or missed.\n\n\nClass Diagram\n\n\n\n\nAbstractBlockReader\n\n\nThis is the abstract implementation that serves as the base for different types of data sources. It defines how a block metadata is processed. The flow diagram below desc
 ribes the processing of a block metadata.\n\n\n\n\nPorts\n\n\n\n\n\n\nblocksMetadataInput: input port on which block metadata are received.\n\n\n\n\n\n\nblocksMetadataOutput: output port on which block metadata are emitted if the port is connected. This port is useful when a downstream operator that receives records from block reader may also be interested to know the details of the corresponding blocks.\n\n\n\n\n\n\nmessages: output port on which tuples of type \ncom.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord\n are emitted. This class encapsulates a \nrecord\n and the \nblockId\n of the corresponding block.\n\n\n\n\n\n\nreaderContext\n\n\nThis is one of the most important fields in the block reader. It is of type \ncom.datatorrent.lib.io.block.ReaderContext\n and is responsible for fetching bytes that make a record. It also lets the reader know how many total bytes were consumed which may not be equal to the total bytes in a record because consumed bytes also include
  bytes for the record delimiter which may not be a part of the actual record.\n\n\nOnce the reader creates an input stream for the block (or uses the previous opened stream if the current block is successor of the previous block) it initializes the reader context by invoking \nreaderContext.initialize(stream, blockMetadata, consecutiveBlock);\n. Initialize method is where any implementation of \nReaderContext\n can perform all the operations which have to be executed just before reading the block or create states which are used during the lifetime of reading the block.\n\n\nOnce the initialization is done, \nreaderContext.next()\n is called repeatedly until it returns \nnull\n. It is left to the \nReaderContext\n implementations to decide when a block is completely processed. In cases when a record is split across adjacent blocks, reader context may decide to read ahead of the current block boundary to completely fetch the split record (examples- \nLineReaderContext\n and \nReadAhea
 dLineReaderContext\n). In other cases when there isn't a possibility of split record (example- \nFixedBytesReaderContext\n), it returns \nnull\n immediately when the block boundary is reached. The return type of \nreaderContext.next()\n is of type \ncom.datatorrent.lib.io.block.ReaderContext.Entity\n which is just a wrapper for a \nbyte[]\n that represents the record and total bytes used in fetching the record.\n\n\nAbstract methods\n\n\n\n\n\n\nSTREAM setupStream(B block)\n: creating a stream for a block is dependent on the type of source which is not known to AbstractBlockReader. Sub-classes which deal with a specific data source provide this implementation.\n\n\n\n\n\n\nR convertToRecord(byte[] bytes)\n: this converts the array of bytes into the actual instance of record type.\n\n\n\n\n\n\nAuto-scalability\n\n\nBlock reader can auto-scale, that is, depending on the backlog (total number of all the blocks which are waiting in the \nblocksMetadataInput\n port queue of all partition
 s) it can create more partitions or reduce them. Details are discussed in the last section which covers the \npartitioner and stats-listener\n.\n\n\nConfiguration\n\n\n\n\nmaxReaders\n: when auto-scaling is enabled, this controls the maximum number of block reader partitions that can be created.\n\n\nminReaders\n: when auto-scaling is enabled, this controls the minimum number of block reader partitions that should always exist.\n\n\ncollectStats\n: this enables or disables auto-scaling. When it is set to \ntrue\n the stats (number of blocks in the queue) are collected and this triggers partitioning; otherwise auto-scaling is disabled.\n\n\nintervalMillis\n: when auto-scaling is enabled, this specifies the interval at which the reader will trigger the logic of computing the backlog and auto-scale.\n\n\n\n\n AbstractFSBlockReader\n\n\nThis abstract implementation deals with files. Different types of file systems that are implementations of \norg.apache.hadoop.fs.FileSystem\n are suppo
 rted. The user can override \ngetFSInstance()\n method to create an instance of a specific \nFileSystem\n. By default, filesystem instance is created from the filesytem URI that comes from the default hadoop configuration.\n\n\nprotected FileSystem getFSInstance() throws IOException\n{\n  return FileSystem.newInstance(configuration);\n}\n\n\n\n\nIt uses this filesystem instance to setup a stream of type \norg.apache.hadoop.fs.FSDataInputStream\n to read the block.\n\n\n@Override\nprotected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata block) throws IOException\n{\n  return fs.open(new Path(block.getFilePath()));\n}\n\n\n\n\nAll the ports and configurations are derived from the super class. It doesn't provide an implementation of \nconvertToRecord(byte[] bytes)\n method which is delegated to concrete sub-classes.\n\n\nExample Application\n\n\nThis simple dag demonstrates how any concrete implementation of \nAbstractFSBlockReader\n can be plugged into an application. \
 n\n\n\n\nIn the above application, file splitter creates block metadata for files which are sent to block reader. Partitions of the block reader parses the file blocks for records which are filtered, transformed and then persisted to a file (created per block). Therefore block reader is parallel partitioned with the 2 downstream operators - filter/converter and record output operator. The code which implements this dag is below.\n\n\npublic class ExampleApplication implements StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, Configuration configuration)\n  {\n    FileSplitterInput input = dag.addOperator(\nFile-splitter\n, new FileSplitterInput());\n    //any concrete implementation of AbstractFSBlockReader based on the use-case can be added here.\n    LineReader blockReader = dag.addOperator(\nBlock-reader\n, new LineReader());\n    Filter filter = dag.addOperator(\nFilter\n, new Filter());\n    RecordOutputOperator recordOutputOperator = dag.addOperator(\nRe
 cord-writer\n, new RecordOutputOperator());\n\n    dag.addStream(\nfile-block metadata\n, input.blocksMetadataOutput, blockReader.blocksMetadataInput);\n    dag.addStream(\nrecords\n, blockReader.messages, filter.input);\n    dag.addStream(\nfiltered-records\n, filter.output, recordOutputOperator.input);\n  }\n\n  /**\n   * Concrete implementation of {@link AbstractFSBlockReader} for which a record is a line in the file.\n   */\n  public static class LineReader extends AbstractFSBlockReader.AbstractFSReadAheadLineReader\nString\n\n  {\n\n    @Override\n    protected String convertToRecord(byte[] bytes)\n    {\n      return new String(bytes);\n    }\n  }\n\n  /**\n   * Considers any line starting with a '.' as invalid. Emits the valid records.\n   */\n  public static class Filter extends BaseOperator\n  {\n    public final transient DefaultOutputPort\nAbstractBlockReader.ReaderRecord\nString\n output = new DefaultOutputPort\n();\n    public final transient DefaultInputPort\nAbstractB
 lockReader.ReaderRecord\nString\n input = new DefaultInputPort\nAbstractBlockReader.ReaderRecord\nString\n()\n    {\n      @Override\n      public void process(AbstractBlockReader.ReaderRecord\nString\n stringRecord)\n      {\n        //filter records and transform\n        //if the string starts with a '.' ignore the string.\n        if (!StringUtils.startsWith(stringRecord.getRecord(), \n.\n)) {\n          output.emit(stringRecord);\n        }\n      }\n    };\n  }\n\n  /**\n   * Persists the valid records to corresponding block files.\n   */\n  public static class RecordOutputOperator extends AbstractFileOutputOperator\nAbstractBlockReader.ReaderRecord\nString\n\n  {\n    @Override\n    protected String getFileName(AbstractBlockReader.ReaderRecord\nString\n tuple)\n    {\n      return Long.toHexString(tuple.getBlockId());\n    }\n\n    @Override\n    protected byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord\nString\n tuple)\n    {\n      return tuple.getRecord().getBytes
 ();\n    }\n  }\n}\n\n\n\n\nConfiguration to parallel partition block reader with its downstream operators.\n\n\n  \nproperty\n\n    \nname\ndt.operator.Filter.port.input.attr.PARTITION_PARALLEL\n/name\n\n    \nvalue\ntrue\n/value\n\n  \n/property\n\n  \nproperty\n\n    \nname\ndt.operator.Record-writer.port.input.attr.PARTITION_PARALLEL\n/name\n\n    \nvalue\ntrue\n/value\n\n  \n/property\n\n\n\n\n\nAbstractFSReadAheadLineReader\n\n\nThis extension of \nAbstractFSBlockReader\n parses lines from a block and binds the \nreaderContext\n field to an instance of \nReaderContext.ReadAheadLineReaderContext\n.\n\n\nIt is abstract because it doesn't provide an implementation of \nconvertToRecord(byte[] bytes)\n since the user may want to convert the bytes that make a line into some other type. \n\n\nReadAheadLineReaderContext\n\n\nIn order to handle a line split across adjacent blocks, ReadAheadLineReaderContext always reads beyond the block boundary and ignores the bytes till the first end
 -of-line character of all the blocks except the first block of the file. This ensures that no line is missed or incomplete.\n\n\nThis is one of the most common ways of handling a split record. It doesn't require any further information to decide if a line is complete. However, the cost of this consistent way to handle a line split is that it always reads from the next block.\n\n\nAbstractFSLineReader\n\n\nSimilar to \nAbstractFSReadAheadLineReader\n, even this parses lines from a block. However, it binds the \nreaderContext\n field to an instance of \nReaderContext.LineReaderContext\n.\n\n\nLineReaderContext\n\n\nThis handles the line split differently from \nReadAheadLineReaderContext\n. It doesn't always read from the next block. If the end of the last line is aligned with the block boundary then it stops processing the block. It does read from the next block when the boundaries are not aligned, that is, last line extends beyond the block boundary. The result of this is an inconsi
 stency in reading the next block.\n\n\nWhen the boundary of the last line of the previous block was aligned with its block, then the first line of the current block is a valid line. However, in the other case the bytes from the block start offset to the first end-of-line character should be ignored. Therefore, this means that any record formed by this reader context has to be validated. For example, if the lines are of fixed size then size of each record can be validated or if each line begins with a special field then that knowledge can be used to check if a record is complete.\n\n\nIf the validations of completeness fails for a line then \nconvertToRecord(byte[] bytes)\n should return null.\n\n\nFSSliceReader\n\n\nA concrete extension of \nAbstractFSBlockReader\n that reads fixed-size \nbyte[]\n from a block and emits the byte array wrapped in \ncom.datatorrent.netlet.util.Slice\n.\n\n\nThis operator binds the \nreaderContext\n to an instance of \nReaderContext.FixedBytesReaderCon
 text\n.\n\n\nFixedBytesReaderContext\n\n\nThis implementation of \nReaderContext\n never reads beyond a block boundary which can result in the last \nbyte[]\n of a block to be of a shorter length than the rest of the records.\n\n\nConfiguration\n\n\nreaderContext.length\n: length of each record. By default, this is initialized to the default hdfs block size.\n\n\nPartitioner and StatsListener\n\n\nThe logical instance of the block reader acts as the Partitioner (unless a custom partitioner is set using the operator attribute - \nPARTITIONER\n) as well as a StatsListener. This is because the \n\nAbstractBlockReader\n implements both the \ncom.datatorrent.api.Partitioner\n and \ncom.datatorrent.api.StatsListener\n interfaces and provides an implementation of \ndefinePartitions(...)\n and \nprocessStats(...)\n which make it auto-scalable.\n\n\nprocessStats \n\n\nThe application master invokes \nResponse processStats(BatchedOperatorStats stats)\n method on the logical instance with the 
 stats (\ntuplesProcessedPSMA\n, \ntuplesEmittedPSMA\n, \nlatencyMA\n, etc.) of each partition. The data which this operator is interested in is the \nqueueSize\n of the input port \nblocksMetadataInput\n.\n\n\nUsually the \nqueueSize\n of an input port gives the count of waiting control tuples plus data tuples. However, if a stats listener is interested only in the count of data tuples then that can be expressed by annotating the class with \n@DataQueueSize\n. In this case \nAbstractBlockReader\n itself is the \nStatsListener\n which is why it is annotated with \n@DataQueueSize\n.\n\n\nThe logical instance caches the queue size per partition and at regular intervals (configured by \nintervalMillis\n) sums these values to find the total backlog which is then used to decide whether re-partitioning is needed. The flow-diagram below describes this logic.\n\n\n\n\nThe goal of this logic is to create as many partitions within bounds (see \nmaxReaders\n and \nminReaders\n above) to quickly
  reduce this backlog or if the backlog is small then remove any idle partitions.\n\n\ndefinePartitions\n\n\nBased on the \nrepartitionRequired\n field of the \nResponse\n object which is returned by \nprocessStats\n method, the application master invokes \n\n\nCollection\nPartition\nAbstractBlockReader\n...\n definePartitions(Collection\nPartition\nAbstractBlockReader\n...\n partitions, PartitioningContext context)\n\n\n\n\non the logical instance which is also the partitioner instance. The implementation calculates the difference between required partitions and the existing count of partitions. If this difference is negative, then equivalent number of partitions are removed otherwise new partitions are created. \n\n\nPlease note auto-scaling can be disabled by setting \ncollectStats\n to \nfalse\n. If the use-case requires only static partitioning, then that can be achieved by setting \nStatelessPartitioner\n as the operator attribute- \nPARTITIONER\n on the block reader.", 
+            "title": "Block Reader"
+        }, 
+        {
+            "location": "/operators/block_reader/#block-reader", 
+            "text": "This is a scalable operator that reads and parses blocks of data sources into records. A data source can be a file or a message bus that contains records and a block defines a chunk of data in the source by specifying the block offset and the length of the source belonging to the block.", 
+            "title": "Block Reader"
+        }, 
+        {
+            "location": "/operators/block_reader/#why-is-it-needed", 
+            "text": "A Block Reader is needed to parallelize reading and parsing of a single data source, for example a file. Simple parallelism of reading data sources can be achieved by multiple partitions reading different source of same type (for files see  AbstractFileInputOperator ) but Block Reader partitions can read blocks of same source in parallel and parse them for records ensuring that no record is duplicated or missed.", 
+            "title": "Why is it needed?"
+        }, 
+        {
+            "location": "/operators/block_reader/#class-diagram", 
+            "text": "", 
+            "title": "Class Diagram"
+        }, 
+        {
+            "location": "/operators/block_reader/#abstractblockreader", 
+            "text": "This is the abstract implementation that serves as the base for different types of data sources. It defines how a block metadata is processed. The flow diagram below describes the processing of a block metadata.", 
+            "title": "AbstractBlockReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#ports", 
+            "text": "blocksMetadataInput: input port on which block metadata are received.    blocksMetadataOutput: output port on which block metadata are emitted if the port is connected. This port is useful when a downstream operator that receives records from block reader may also be interested to know the details of the corresponding blocks.    messages: output port on which tuples of type  com.datatorrent.lib.io.block.AbstractBlockReader.ReaderRecord  are emitted. This class encapsulates a  record  and the  blockId  of the corresponding block.", 
+            "title": "Ports"
+        }, 
+        {
+            "location": "/operators/block_reader/#readercontext", 
+            "text": "This is one of the most important fields in the block reader. It is of type  com.datatorrent.lib.io.block.ReaderContext  and is responsible for fetching bytes that make a record. It also lets the reader know how many total bytes were consumed which may not be equal to the total bytes in a record because consumed bytes also include bytes for the record delimiter which may not be a part of the actual record.  Once the reader creates an input stream for the block (or uses the previous opened stream if the current block is successor of the previous block) it initializes the reader context by invoking  readerContext.initialize(stream, blockMetadata, consecutiveBlock); . Initialize method is where any implementation of  ReaderContext  can perform all the operations which have to be executed just before reading the block or create states which are used during the lifetime of reading the block.  Once the initialization is done,  readerContext.next()  is called repeatedl
 y until it returns  null . It is left to the  ReaderContext  implementations to decide when a block is completely processed. In cases when a record is split across adjacent blocks, reader context may decide to read ahead of the current block boundary to completely fetch the split record (examples-  LineReaderContext  and  ReadAheadLineReaderContext ). In other cases when there isn't a possibility of split record (example-  FixedBytesReaderContext ), it returns  null  immediately when the block boundary is reached. The return type of  readerContext.next()  is of type  com.datatorrent.lib.io.block.ReaderContext.Entity  which is just a wrapper for a  byte[]  that represents the record and total bytes used in fetching the record.", 
+            "title": "readerContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#abstract-methods", 
+            "text": "STREAM setupStream(B block) : creating a stream for a block is dependent on the type of source which is not known to AbstractBlockReader. Sub-classes which deal with a specific data source provide this implementation.    R convertToRecord(byte[] bytes) : this converts the array of bytes into the actual instance of record type.", 
+            "title": "Abstract methods"
+        }, 
+        {
+            "location": "/operators/block_reader/#auto-scalability", 
+            "text": "Block reader can auto-scale, that is, depending on the backlog (total number of all the blocks which are waiting in the  blocksMetadataInput  port queue of all partitions) it can create more partitions or reduce them. Details are discussed in the last section which covers the  partitioner and stats-listener .", 
+            "title": "Auto-scalability"
+        }, 
+        {
+            "location": "/operators/block_reader/#configuration", 
+            "text": "maxReaders : when auto-scaling is enabled, this controls the maximum number of block reader partitions that can be created.  minReaders : when auto-scaling is enabled, this controls the minimum number of block reader partitions that should always exist.  collectStats : this enables or disables auto-scaling. When it is set to  true  the stats (number of blocks in the queue) are collected and this triggers partitioning; otherwise auto-scaling is disabled.  intervalMillis : when auto-scaling is enabled, this specifies the interval at which the reader will trigger the logic of computing the backlog and auto-scale.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": "/operators/block_reader/#example-application", 
+            "text": "This simple dag demonstrates how any concrete implementation of  AbstractFSBlockReader  can be plugged into an application.    In the above application, file splitter creates block metadata for files which are sent to block reader. Partitions of the block reader parses the file blocks for records which are filtered, transformed and then persisted to a file (created per block). Therefore block reader is parallel partitioned with the 2 downstream operators - filter/converter and record output operator. The code which implements this dag is below.  public class ExampleApplication implements StreamingApplication\n{\n  @Override\n  public void populateDAG(DAG dag, Configuration configuration)\n  {\n    FileSplitterInput input = dag.addOperator( File-splitter , new FileSplitterInput());\n    //any concrete implementation of AbstractFSBlockReader based on the use-case can be added here.\n    LineReader blockReader = dag.addOperator( Block-reader , new LineReader());\n 
    Filter filter = dag.addOperator( Filter , new Filter());\n    RecordOutputOperator recordOutputOperator = dag.addOperator( Record-writer , new RecordOutputOperator());\n\n    dag.addStream( file-block metadata , input.blocksMetadataOutput, blockReader.blocksMetadataInput);\n    dag.addStream( records , blockReader.messages, filter.input);\n    dag.addStream( filtered-records , filter.output, recordOutputOperator.input);\n  }\n\n  /**\n   * Concrete implementation of {@link AbstractFSBlockReader} for which a record is a line in the file.\n   */\n  public static class LineReader extends AbstractFSBlockReader.AbstractFSReadAheadLineReader String \n  {\n\n    @Override\n    protected String convertToRecord(byte[] bytes)\n    {\n      return new String(bytes);\n    }\n  }\n\n  /**\n   * Considers any line starting with a '.' as invalid. Emits the valid records.\n   */\n  public static class Filter extends BaseOperator\n  {\n    public final transient DefaultOutputPort AbstractBlockRea
 der.ReaderRecord String  output = new DefaultOutputPort ();\n    public final transient DefaultInputPort AbstractBlockReader.ReaderRecord String  input = new DefaultInputPort AbstractBlockReader.ReaderRecord String ()\n    {\n      @Override\n      public void process(AbstractBlockReader.ReaderRecord String  stringRecord)\n      {\n        //filter records and transform\n        //if the string starts with a '.' ignore the string.\n        if (!StringUtils.startsWith(stringRecord.getRecord(),  . )) {\n          output.emit(stringRecord);\n        }\n      }\n    };\n  }\n\n  /**\n   * Persists the valid records to corresponding block files.\n   */\n  public static class RecordOutputOperator extends AbstractFileOutputOperator AbstractBlockReader.ReaderRecord String \n  {\n    @Override\n    protected String getFileName(AbstractBlockReader.ReaderRecord String  tuple)\n    {\n      return Long.toHexString(tuple.getBlockId());\n    }\n\n    @Override\n    protected byte[] getBytesForTup
 le(AbstractBlockReader.ReaderRecord String  tuple)\n    {\n      return tuple.getRecord().getBytes();\n    }\n  }\n}  Configuration to parallel partition block reader with its downstream operators.     property \n     name dt.operator.Filter.port.input.attr.PARTITION_PARALLEL /name \n     value true /value \n   /property \n   property \n     name dt.operator.Record-writer.port.input.attr.PARTITION_PARALLEL /name \n     value true /value \n   /property", 
+            "title": "Example Application"
+        }, 
+        {
+            "location": "/operators/block_reader/#abstractfsreadaheadlinereader", 
+            "text": "This extension of  AbstractFSBlockReader  parses lines from a block and binds the  readerContext  field to an instance of  ReaderContext.ReadAheadLineReaderContext .  It is abstract because it doesn't provide an implementation of  convertToRecord(byte[] bytes)  since the user may want to convert the bytes that make a line into some other type.", 
+            "title": "AbstractFSReadAheadLineReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#readaheadlinereadercontext", 
+            "text": "In order to handle a line split across adjacent blocks, ReadAheadLineReaderContext always reads beyond the block boundary and ignores the bytes till the first end-of-line character of all the blocks except the first block of the file. This ensures that no line is missed or incomplete.  This is one of the most common ways of handling a split record. It doesn't require any further information to decide if a line is complete. However, the cost of this consistent way to handle a line split is that it always reads from the next block.", 
+            "title": "ReadAheadLineReaderContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#abstractfslinereader", 
+            "text": "Similar to  AbstractFSReadAheadLineReader , even this parses lines from a block. However, it binds the  readerContext  field to an instance of  ReaderContext.LineReaderContext .", 
+            "title": "AbstractFSLineReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#linereadercontext", 
+            "text": "This handles the line split differently from  ReadAheadLineReaderContext . It doesn't always read from the next block. If the end of the last line is aligned with the block boundary then it stops processing the block. It does read from the next block when the boundaries are not aligned, that is, last line extends beyond the block boundary. The result of this is an inconsistency in reading the next block.  When the boundary of the last line of the previous block was aligned with its block, then the first line of the current block is a valid line. However, in the other case the bytes from the block start offset to the first end-of-line character should be ignored. Therefore, this means that any record formed by this reader context has to be validated. For example, if the lines are of fixed size then size of each record can be validated or if each line begins with a special field then that knowledge can be used to check if a record is complete.  If the validations 
 of completeness fails for a line then  convertToRecord(byte[] bytes)  should return null.", 
+            "title": "LineReaderContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#fsslicereader", 
+            "text": "A concrete extension of  AbstractFSBlockReader  that reads fixed-size  byte[]  from a block and emits the byte array wrapped in  com.datatorrent.netlet.util.Slice .  This operator binds the  readerContext  to an instance of  ReaderContext.FixedBytesReaderContext .", 
+            "title": "FSSliceReader"
+        }, 
+        {
+            "location": "/operators/block_reader/#fixedbytesreadercontext", 
+            "text": "This implementation of  ReaderContext  never reads beyond a block boundary which can result in the last  byte[]  of a block to be of a shorter length than the rest of the records.", 
+            "title": "FixedBytesReaderContext"
+        }, 
+        {
+            "location": "/operators/block_reader/#configuration_1", 
+            "text": "readerContext.length : length of each record. By default, this is initialized to the default hdfs block size.", 
+            "title": "Configuration"
+        }, 
+        {
+            "location": "/operators/block_reader/#partitioner-and-statslistener", 
+            "text": "The logical instance of the block reader acts as the Partitioner (unless a custom partitioner is set using the operator attribute -  PARTITIONER ) as well as a StatsListener. This is because the  AbstractBlockReader  implements both the  com.datatorrent.api.Partitioner  and  com.datatorrent.api.StatsListener  interfaces and provides an implementation of  definePartitions(...)  and  processStats(...)  which make it auto-scalable.", 
+            "title": "Partitioner and StatsListener"
+        }, 
+ 

<TRUNCATED>