You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ra...@apache.org on 2016/10/13 17:41:14 UTC

[2/2] apex-malhar git commit: APEXMALHAR-2242 Additions and updates to the documentation.

APEXMALHAR-2242 Additions and updates to the documentation.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/352e2d92
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/352e2d92
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/352e2d92

Branch: refs/heads/master
Commit: 352e2d92cb551c0fa4489f729379c32ad47b08a2
Parents: bd502e7
Author: Thomas Weise <th...@apache.org>
Authored: Wed Oct 12 20:56:39 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Thu Oct 13 10:24:45 2016 -0700

----------------------------------------------------------------------
 docs/operators/kafkaInputOperator.md | 160 +++++++++++++++---------------
 1 file changed, 82 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/352e2d92/docs/operators/kafkaInputOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/kafkaInputOperator.md b/docs/operators/kafkaInputOperator.md
index cb29e5d..2886b4b 100644
--- a/docs/operators/kafkaInputOperator.md
+++ b/docs/operators/kafkaInputOperator.md
@@ -1,19 +1,30 @@
 KAFKA INPUT OPERATOR
 =====================
 
-### Introduction: About Kafka Input Operator
+### Introduction
 
-This is an input operator that consumes data from Kafka messaging system for further processing in Apex. Kafka Input Operator is an fault-tolerant and scalable Malhar Operator.
+[Apache Kafka](http://kafka.apache.org) is a pull-based and distributed publish subscribe messaging system,
+topics are partitioned and replicated across nodes. 
 
-### Why is it needed ?
+The Kafka input operator consumes data from the partitions of a Kafka topic for processing in Apex. 
+The operator has the ability to automatically scale with the Kafka partitioning for high throughput. 
+It is fault-tolerant (consumer offset checkpointing) and guarantees idempotency to allow exactly-once results in the downstream pipeline.
 
-Kafka is a pull-based and distributed publish subscribe messaging system, topics are partitioned and replicated across
-nodes. Kafka input operator is needed when you want to read data from multiple
-partitions of a Kafka topic in parallel in an Apex application.
+For more information about the operator design see this [presentation](http://www.slideshare.net/ApacheApex/apache-apex-kafka-input-operator)
+and for processing guarantees this [blog](https://www.datatorrent.com/blog/end-to-end-exactly-once-with-apache-apex/).
 
-### 0.8 Version of Kafka Input Operator
+There are two separate implementations of the input operator,
+one built against Kafka 0.8 client and a newer version for the
+Kafka 0.9 consumer API that also works with MapR Streams.
+These reside in different packages and are described separately below.
 
-### AbstractKafkaInputOperator (Package: com.datatorrent.contrib.kafka)
+### Kafka Input Operator for Kafka 0.8.x
+
+Package: `com.datatorrent.contrib.kafka`
+
+Maven artifact: [malhar-contrib](https://mvnrepository.com/artifact/org.apache.apex/malhar-contrib)
+
+### AbstractKafkaInputOperator
 
 This is the abstract implementation that serves as base class for consuming messages from Kafka messaging system. This class doesn\u2019t have any ports.
 
@@ -77,8 +88,7 @@ Default Value = ONE_TO_ONE</p></td>
 
 #### Abstract Methods
 
-void emitTuple(Message message): Abstract method that emits tuples
-extracted from Kafka message.
+`void emitTuple(Message message)`: Abstract method that emits tuples extracted from Kafka message.
 
 ### KafkaConsumer
 
@@ -92,8 +102,8 @@ functionality of High Level Consumer API.
 
 ### Pre-requisites
 
-This operator referred the Kafka Consumer API of version
-0.8.1.1. So, this operator will work with any 0.8.x and 0.7.x version of Apache Kafka.
+This operator uses the Kafka 0.8.2.1 client consumer API
+and will work with 0.8.x and 0.7.x versions of Kafka broker.
 
 #### Configuration Parameters
 
@@ -197,9 +207,9 @@ public interface OffsetManager
 ```
 #### Abstract Methods�����������������
 
-Map &lt;KafkaPartition, Long&gt; loadInitialOffsets(): Specifies the initial offset for consuming messages; called at the activation stage.
+`Map <KafkaPartition, Long> loadInitialOffsets()`: Specifies the initial offset for consuming messages; called at the activation stage.
 
-updateOffsets(Map &lt;KafkaPartition, Long&gt; offsetsOfPartitions): �This
+`updateOffsets(Map<KafkaPartition, Long> offsetsOfPartitions)`: �This
 method is called at every repartitionCheckInterval to update offsets.
 
 ### Partitioning
@@ -228,25 +238,20 @@ parameter repartitionInterval value to a negative value.
 
 ### AbstractSinglePortKafkaInputOperator
 
-This class extends AbstractKafkaInputOperator and having single output
-port, will emit the messages through this port.
+This class extends AbstractKafkaInputOperator to emit messages through single output port.
 
 #### Ports
 
-outputPort &lt;T&gt;: Tuples extracted from Kafka messages are emitted through
-this port.
+`outputPort <T>`: Tuples extracted from Kafka messages are emitted through this port.
 
 #### Abstract Methods
 
-T getTuple(Message msg) : Converts the Kafka message to tuple.
+`T getTuple(Message msg)`: Converts the Kafka message to tuple.
 
 ### Concrete Classes
 
-1.  KafkaSinglePortStringInputOperator :
-This class extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts string from Kafka message.
-
-2.  KafkaSinglePortByteArrayInputOperator:
-This class extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts byte array from Kafka message.
+1. KafkaSinglePortStringInputOperator: extends `AbstractSinglePortKafkaInputOperator`, extracts string from Kafka message.
+2. KafkaSinglePortByteArrayInputOperator: extends `AbstractSinglePortKafkaInputOperator`, extracts byte array from Kafka message.
 
 ### Application Example
 
@@ -257,15 +262,13 @@ Below is the code snippet:
 @ApplicationAnnotation(name = "KafkaApp")
 public class ExampleKafkaApplication implements StreamingApplication
 {
-@Override
-public void populateDAG(DAG dag, Configuration entries)
-{
-  KafkaSinglePortByteArrayInputOperator input =  dag.addOperator("MessageReader", new KafkaSinglePortByteArrayInputOperator());
-
-  ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
-
-  dag.addStream("MessageData", input.outputPort, output.input);
-}
+  @Override
+  public void populateDAG(DAG dag, Configuration entries)
+  {
+    KafkaSinglePortByteArrayInputOperator input =  dag.addOperator("MessageReader", new KafkaSinglePortByteArrayInputOperator());
+    ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
+    dag.addStream("MessageData", input.outputPort, output.input);
+  }
 }
 ```
 Below is the configuration for \u201ctest\u201d Kafka topic name and
@@ -273,27 +276,32 @@ Below is the configuration for \u201ctest\u201d Kafka topic name and
 
 ```xml
 <property>
-<name>dt.operator.MessageReader.prop.topic</name>
-<value>test</value>
+  <name>dt.operator.MessageReader.prop.topic</name>
+  <value>test</value>
 </property>
 
 <property>
-<name>dt.operator.KafkaInputOperator.prop.zookeeper</nam>
-<value>localhost:2181</value>
+  <name>dt.operator.KafkaInputOperator.prop.zookeeper</nam>
+  <value>localhost:2181</value>
 </property>
 ```
 
 
-### 0.9 Version of Kafka Input Operator
+### Kafka Input Operator for Kafka 0.9.x
+
+Package: `org.apache.apex.malhar.kafka`
 
-### AbstractKafkaInputOperator (Package: org.apache.apex.malhar.kafka)
+Maven Artifact: [malhar-kafka](https://mvnrepository.com/artifact/org.apache.apex/malhar-kafka)
 
-This version uses the new 0.9 version of consumer API and features of this version are described here. This operator is fault-tolerant, scalable, multi-cluster and multi-topic support.
+This version uses the new 0.9 version of consumer API and works with Kafka broker version 0.9 and later.
+The operator is fault-tolerant, scalable and supports input from multiple clusters and multiple topics in a single operator instance.
 
 #### Pre-requisites
 
 This operator requires version 0.9.0 or later of the Kafka Consumer API.
 
+### AbstractKafkaInputOperator
+
 #### Ports
 ----------
 
@@ -311,11 +319,11 @@ This abstract class doesn't have any ports.
     -   Specified the Kafka topics that you want to consume messages from. If you want multi-topic support, then specify the topics separated by ",".
 
 -   ***strategy*** - PartitionStrategy
-    -   Operator supports two types of partitioning strategies, ONE_TO_ONE and ONE_TO_MANY.
+    -   Operator supports two types of partitioning strategies, `ONE_TO_ONE` and `ONE_TO_MANY`.
     
-        ONE_TO_ONE: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances.
-        ONE_TO_MANY: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances.
-        Default Value = PartitionStrategy.ONE_TO_ONE.
+        `ONE_TO_ONE`: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances.
+        `ONE_TO_MANY`: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances.
+        Default Value = `PartitionStrategy.ONE_TO_ONE`.
 
 -   ***initialPartitionCount*** - Integer
     -   When the ONE_TO_MANY partition strategy is enabled, this value indicates the number of Kafka input operator instances. 
@@ -331,23 +339,22 @@ This abstract class doesn't have any ports.
 
 -   ***maxTuplesPerWindow*** - Integer
     -   Controls the maximum number of messages emitted in each streaming window from this operator. Minimum value is 1. 
-        Default value = MAX_VALUE 
+        Default value = `MAX_VALUE` 
 
 -   ***initialOffset*** - InitialOffset
-    -   Indicates the type of offset i.e, \u201cEARLIEST or LATEST or APPLICATION_OR_EARLIEST or APPLICATION_OR_LATEST\u201d. 
-        LATEST => Operator consume messages from latest point of Kafka queue. 
-        EARLIEST => Operator consume messages starting from message queue.
-        APPLICATION_OR_EARLIEST => Operator consume messages from committed position from last run. If there is no committed offset, then it starts consuming from beginning of kafka queue.
-        APPLICATION_OR_LATEST => Operator consume messages from committed position from last run. If there is not committed offset, then it starts consuming from latest position of queue.
-        Default value = InitialOffset.APPLICATION_OR_LATEST
+    -   Indicates the type of offset i.e, `EARLIEST` or `LATEST` or `APPLICATION_OR_EARLIEST` or `APPLICATION_OR_LATEST`. 
+        `LATEST` => Consume new messages from latest offset in the topic. 
+        `EARLIEST` => Consume all messages available in the topic.
+        `APPLICATION_OR_EARLIEST` => Consume messages from committed position from last run. If there is no committed offset, then start consuming from beginning.
+        `APPLICATION_OR_LATEST` => Consumes messages from committed position from last run. If a committed offset is unavailable, then start consuming from latest position.
+        Default value = `InitialOffset.APPLICATION_OR_LATEST`
 
 -   ***metricsRefreshInterval*** - Long
     -   Interval specified in milliseconds. This value specifies the minimum interval between two metric stat updates.
         Default value = 5 Seconds.
 
 -   ***consumerTimeout*** - Long
-    -   Indicates the time waiting in poll data if data is not available. Please refer the below link:
-        http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll
+    -   Indicates the [time waiting in poll](http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll) when data is not available.
         Default value = 5 Seconds.
 
 -   ***holdingBufferSize*** - Long
@@ -355,25 +362,24 @@ This abstract class doesn't have any ports.
         Default value = 1024.
 
 -   ***consumerProps*** - Properties
-    -   Specify the consumer properties which are not yet set to the operator. Please refer the below link for consumer properties:
-        http://kafka.apache.org/090/documentation.html#newconsumerconfigs
+    -   Specify the [consumer properties[(http://kafka.apache.org/090/documentation.html#newconsumerconfigs) which are not yet set to the operator.
 
 -   ***windowDataManager*** - WindowDataManager
-    -   Specifies that the operator will process the same set of messages in a window before and after a failure. This is important but it comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window.
-        Default value = WindowDataManager.NoopWindowDataManager.
+    -   If set to a value other than the default, such as `FSWindowDataManager`, specifies that the operator will process the same set of messages in a window before and after a failure. This is important but it comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window.
+        Default value = `WindowDataManager.NoopWindowDataManager`.
         
 #### Abstract Methods
 
-void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message): Abstract method that emits tuples
+`void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message)`: Abstract method that emits tuples
 extracted from Kafka message.
 
 ### Concrete Classes
 
 #### KafkaSinglePortInputOperator
-This class extends from AbstractKafkaInputOperator and define the getTuple() method which extracts byte array from Kafka message.
+This class extends from AbstractKafkaInputOperator and defines the `getTuple()` method which extracts byte array from Kafka message.
 
 #### Ports
-outputPort <byte[]>: Tuples extracted from Kafka messages are emitted through this port.
+`outputPort <byte[]>`: Tuples extracted from Kafka messages are emitted through this port.
 
 ### Application Example
 This section builds an Apex application using Kafka input operator.
@@ -383,15 +389,13 @@ Below is the code snippet:
 @ApplicationAnnotation(name = "KafkaApp")
 public class ExampleKafkaApplication implements StreamingApplication
 {
-@Override
-public void populateDAG(DAG dag, Configuration entries)
-{
-  KafkaSinglePortInputOperator input =  dag.addOperator("MessageReader", new KafkaSinglePortInputOperator());
-
-  ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
-
-  dag.addStream("MessageData", input.outputPort, output.input);
-}
+  @Override
+  public void populateDAG(DAG dag, Configuration entries)
+  {
+    KafkaSinglePortInputOperator input =  dag.addOperator("MessageReader", new KafkaSinglePortInputOperator());
+    ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
+    dag.addStream("MessageData", input.outputPort, output.input);
+  }
 }
 ```
 Below is the configuration for \u201ctest\u201d Kafka topic name and
@@ -399,13 +403,13 @@ Below is the configuration for \u201ctest\u201d Kafka topic name and
 
 ```xml
 <property>
-<name>dt.operator.MessageReader.prop.topics</name>
-<value>test</value>
+  <name>dt.operator.MessageReader.prop.topics</name>
+  <value>test</value>
 </property>
 
 <property>
-<name>dt.operator.KafkaInputOperator.prop.clusters</nam>
-<value>localhost:9092</value>
+  <name>dt.operator.KafkaInputOperator.prop.clusters</nam>
+  <value>localhost:9092</value>
 </property>
 ```
 
@@ -413,14 +417,14 @@ By adding following lines to properties file, Kafka Input Operator supports mult
  
 ```xml
 <property>
-<name>dt.operator.MessageReader.prop.topics</name>
-<value>test1, test2</value>
+  <name>dt.operator.MessageReader.prop.topics</name>
+  <value>test1, test2</value>
 </property>
  
 <property>
-<name>dt.operator.KafkaInputOperator.prop.clusters</nam>
-<value>localhost:9092; localhost:9093; localhost:9094</value>
+  <name>dt.operator.KafkaInputOperator.prop.clusters</nam>
+  <value>localhost:9092; localhost:9093; localhost:9094</value>
 </property>
 ```
 
-For more details about example application, Please refer to https://github.com/DataTorrent/examples/tree/master/tutorials/kafka.
+For a full example application project, refer to https://github.com/DataTorrent/examples/tree/master/tutorials/kafka