You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ra...@apache.org on 2016/10/13 17:41:14 UTC
[2/2] apex-malhar git commit: APEXMALHAR-2242 Additions and updates
to the documentation.
APEXMALHAR-2242 Additions and updates to the documentation.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/352e2d92
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/352e2d92
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/352e2d92
Branch: refs/heads/master
Commit: 352e2d92cb551c0fa4489f729379c32ad47b08a2
Parents: bd502e7
Author: Thomas Weise <th...@apache.org>
Authored: Wed Oct 12 20:56:39 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Thu Oct 13 10:24:45 2016 -0700
----------------------------------------------------------------------
docs/operators/kafkaInputOperator.md | 160 +++++++++++++++---------------
1 file changed, 82 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/352e2d92/docs/operators/kafkaInputOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/kafkaInputOperator.md b/docs/operators/kafkaInputOperator.md
index cb29e5d..2886b4b 100644
--- a/docs/operators/kafkaInputOperator.md
+++ b/docs/operators/kafkaInputOperator.md
@@ -1,19 +1,30 @@
KAFKA INPUT OPERATOR
=====================
-### Introduction: About Kafka Input Operator
+### Introduction
-This is an input operator that consumes data from Kafka messaging system for further processing in Apex. Kafka Input Operator is an fault-tolerant and scalable Malhar Operator.
+[Apache Kafka](http://kafka.apache.org) is a pull-based and distributed publish subscribe messaging system,
+topics are partitioned and replicated across nodes.
-### Why is it needed ?
+The Kafka input operator consumes data from the partitions of a Kafka topic for processing in Apex.
+The operator has the ability to automatically scale with the Kafka partitioning for high throughput.
+It is fault-tolerant (consumer offset checkpointing) and guarantees idempotency to allow exactly-once results in the downstream pipeline.
-Kafka is a pull-based and distributed publish subscribe messaging system, topics are partitioned and replicated across
-nodes. Kafka input operator is needed when you want to read data from multiple
-partitions of a Kafka topic in parallel in an Apex application.
+For more information about the operator design see this [presentation](http://www.slideshare.net/ApacheApex/apache-apex-kafka-input-operator)
+and for processing guarantees this [blog](https://www.datatorrent.com/blog/end-to-end-exactly-once-with-apache-apex/).
-### 0.8 Version of Kafka Input Operator
+There are two separate implementations of the input operator,
+one built against Kafka 0.8 client and a newer version for the
+Kafka 0.9 consumer API that also works with MapR Streams.
+These reside in different packages and are described separately below.
-### AbstractKafkaInputOperator (Package: com.datatorrent.contrib.kafka)
+### Kafka Input Operator for Kafka 0.8.x
+
+Package: `com.datatorrent.contrib.kafka`
+
+Maven artifact: [malhar-contrib](https://mvnrepository.com/artifact/org.apache.apex/malhar-contrib)
+
+### AbstractKafkaInputOperator
This is the abstract implementation that serves as base class for consuming messages from Kafka messaging system. This class doesn\u2019t have any ports.
@@ -77,8 +88,7 @@ Default Value = ONE_TO_ONE</p></td>
#### Abstract Methods
-void emitTuple(Message message): Abstract method that emits tuples
-extracted from Kafka message.
+`void emitTuple(Message message)`: Abstract method that emits tuples extracted from Kafka message.
### KafkaConsumer
@@ -92,8 +102,8 @@ functionality of High Level Consumer API.
### Pre-requisites
-This operator referred the Kafka Consumer API of version
-0.8.1.1. So, this operator will work with any 0.8.x and 0.7.x version of Apache Kafka.
+This operator uses the Kafka 0.8.2.1 client consumer API
+and will work with 0.8.x and 0.7.x versions of Kafka broker.
#### Configuration Parameters
@@ -197,9 +207,9 @@ public interface OffsetManager
```
#### Abstract Methods�����������������
-Map <KafkaPartition, Long> loadInitialOffsets(): Specifies the initial offset for consuming messages; called at the activation stage.
+`Map <KafkaPartition, Long> loadInitialOffsets()`: Specifies the initial offset for consuming messages; called at the activation stage.
-updateOffsets(Map <KafkaPartition, Long> offsetsOfPartitions): �This
+`updateOffsets(Map<KafkaPartition, Long> offsetsOfPartitions)`: �This
method is called at every repartitionCheckInterval to update offsets.
### Partitioning
@@ -228,25 +238,20 @@ parameter repartitionInterval value to a negative value.
### AbstractSinglePortKafkaInputOperator
-This class extends AbstractKafkaInputOperator and having single output
-port, will emit the messages through this port.
+This class extends AbstractKafkaInputOperator to emit messages through single output port.
#### Ports
-outputPort <T>: Tuples extracted from Kafka messages are emitted through
-this port.
+`outputPort <T>`: Tuples extracted from Kafka messages are emitted through this port.
#### Abstract Methods
-T getTuple(Message msg) : Converts the Kafka message to tuple.
+`T getTuple(Message msg)`: Converts the Kafka message to tuple.
### Concrete Classes
-1. KafkaSinglePortStringInputOperator :
-This class extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts string from Kafka message.
-
-2. KafkaSinglePortByteArrayInputOperator:
-This class extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts byte array from Kafka message.
+1. KafkaSinglePortStringInputOperator: extends `AbstractSinglePortKafkaInputOperator`, extracts string from Kafka message.
+2. KafkaSinglePortByteArrayInputOperator: extends `AbstractSinglePortKafkaInputOperator`, extracts byte array from Kafka message.
### Application Example
@@ -257,15 +262,13 @@ Below is the code snippet:
@ApplicationAnnotation(name = "KafkaApp")
public class ExampleKafkaApplication implements StreamingApplication
{
-@Override
-public void populateDAG(DAG dag, Configuration entries)
-{
- KafkaSinglePortByteArrayInputOperator input = dag.addOperator("MessageReader", new KafkaSinglePortByteArrayInputOperator());
-
- ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
-
- dag.addStream("MessageData", input.outputPort, output.input);
-}
+ @Override
+ public void populateDAG(DAG dag, Configuration entries)
+ {
+ KafkaSinglePortByteArrayInputOperator input = dag.addOperator("MessageReader", new KafkaSinglePortByteArrayInputOperator());
+ ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
+ dag.addStream("MessageData", input.outputPort, output.input);
+ }
}
```
Below is the configuration for \u201ctest\u201d Kafka topic name and
@@ -273,27 +276,32 @@ Below is the configuration for \u201ctest\u201d Kafka topic name and
```xml
<property>
-<name>dt.operator.MessageReader.prop.topic</name>
-<value>test</value>
+ <name>dt.operator.MessageReader.prop.topic</name>
+ <value>test</value>
</property>
<property>
-<name>dt.operator.KafkaInputOperator.prop.zookeeper</nam>
-<value>localhost:2181</value>
+ <name>dt.operator.KafkaInputOperator.prop.zookeeper</nam>
+ <value>localhost:2181</value>
</property>
```
-### 0.9 Version of Kafka Input Operator
+### Kafka Input Operator for Kafka 0.9.x
+
+Package: `org.apache.apex.malhar.kafka`
-### AbstractKafkaInputOperator (Package: org.apache.apex.malhar.kafka)
+Maven Artifact: [malhar-kafka](https://mvnrepository.com/artifact/org.apache.apex/malhar-kafka)
-This version uses the new 0.9 version of consumer API and features of this version are described here. This operator is fault-tolerant, scalable, multi-cluster and multi-topic support.
+This version uses the new 0.9 version of consumer API and works with Kafka broker version 0.9 and later.
+The operator is fault-tolerant, scalable and supports input from multiple clusters and multiple topics in a single operator instance.
#### Pre-requisites
This operator requires version 0.9.0 or later of the Kafka Consumer API.
+### AbstractKafkaInputOperator
+
#### Ports
----------
@@ -311,11 +319,11 @@ This abstract class doesn't have any ports.
- Specified the Kafka topics that you want to consume messages from. If you want multi-topic support, then specify the topics separated by ",".
- ***strategy*** - PartitionStrategy
- - Operator supports two types of partitioning strategies, ONE_TO_ONE and ONE_TO_MANY.
+ - Operator supports two types of partitioning strategies, `ONE_TO_ONE` and `ONE_TO_MANY`.
- ONE_TO_ONE: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances.
- ONE_TO_MANY: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances.
- Default Value = PartitionStrategy.ONE_TO_ONE.
+ `ONE_TO_ONE`: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances.
+ `ONE_TO_MANY`: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances.
+ Default Value = `PartitionStrategy.ONE_TO_ONE`.
- ***initialPartitionCount*** - Integer
- When the ONE_TO_MANY partition strategy is enabled, this value indicates the number of Kafka input operator instances.
@@ -331,23 +339,22 @@ This abstract class doesn't have any ports.
- ***maxTuplesPerWindow*** - Integer
- Controls the maximum number of messages emitted in each streaming window from this operator. Minimum value is 1.
- Default value = MAX_VALUE
+ Default value = `MAX_VALUE`
- ***initialOffset*** - InitialOffset
- - Indicates the type of offset i.e, \u201cEARLIEST or LATEST or APPLICATION_OR_EARLIEST or APPLICATION_OR_LATEST\u201d.
- LATEST => Operator consume messages from latest point of Kafka queue.
- EARLIEST => Operator consume messages starting from message queue.
- APPLICATION_OR_EARLIEST => Operator consume messages from committed position from last run. If there is no committed offset, then it starts consuming from beginning of kafka queue.
- APPLICATION_OR_LATEST => Operator consume messages from committed position from last run. If there is not committed offset, then it starts consuming from latest position of queue.
- Default value = InitialOffset.APPLICATION_OR_LATEST
+ - Indicates the type of offset i.e, `EARLIEST` or `LATEST` or `APPLICATION_OR_EARLIEST` or `APPLICATION_OR_LATEST`.
+ `LATEST` => Consume new messages from latest offset in the topic.
+ `EARLIEST` => Consume all messages available in the topic.
+ `APPLICATION_OR_EARLIEST` => Consume messages from committed position from last run. If there is no committed offset, then start consuming from beginning.
+ `APPLICATION_OR_LATEST` => Consumes messages from committed position from last run. If a committed offset is unavailable, then start consuming from latest position.
+ Default value = `InitialOffset.APPLICATION_OR_LATEST`
- ***metricsRefreshInterval*** - Long
- Interval specified in milliseconds. This value specifies the minimum interval between two metric stat updates.
Default value = 5 Seconds.
- ***consumerTimeout*** - Long
- - Indicates the time waiting in poll data if data is not available. Please refer the below link:
- http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll
+ - Indicates the [time waiting in poll](http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll) when data is not available.
Default value = 5 Seconds.
- ***holdingBufferSize*** - Long
@@ -355,25 +362,24 @@ This abstract class doesn't have any ports.
Default value = 1024.
- ***consumerProps*** - Properties
- - Specify the consumer properties which are not yet set to the operator. Please refer the below link for consumer properties:
- http://kafka.apache.org/090/documentation.html#newconsumerconfigs
+ - Specify the [consumer properties[(http://kafka.apache.org/090/documentation.html#newconsumerconfigs) which are not yet set to the operator.
- ***windowDataManager*** - WindowDataManager
- - Specifies that the operator will process the same set of messages in a window before and after a failure. This is important but it comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window.
- Default value = WindowDataManager.NoopWindowDataManager.
+ - If set to a value other than the default, such as `FSWindowDataManager`, specifies that the operator will process the same set of messages in a window before and after a failure. This is important but it comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window.
+ Default value = `WindowDataManager.NoopWindowDataManager`.
#### Abstract Methods
-void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message): Abstract method that emits tuples
+`void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message)`: Abstract method that emits tuples
extracted from Kafka message.
### Concrete Classes
#### KafkaSinglePortInputOperator
-This class extends from AbstractKafkaInputOperator and define the getTuple() method which extracts byte array from Kafka message.
+This class extends from AbstractKafkaInputOperator and defines the `getTuple()` method which extracts byte array from Kafka message.
#### Ports
-outputPort <byte[]>: Tuples extracted from Kafka messages are emitted through this port.
+`outputPort <byte[]>`: Tuples extracted from Kafka messages are emitted through this port.
### Application Example
This section builds an Apex application using Kafka input operator.
@@ -383,15 +389,13 @@ Below is the code snippet:
@ApplicationAnnotation(name = "KafkaApp")
public class ExampleKafkaApplication implements StreamingApplication
{
-@Override
-public void populateDAG(DAG dag, Configuration entries)
-{
- KafkaSinglePortInputOperator input = dag.addOperator("MessageReader", new KafkaSinglePortInputOperator());
-
- ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
-
- dag.addStream("MessageData", input.outputPort, output.input);
-}
+ @Override
+ public void populateDAG(DAG dag, Configuration entries)
+ {
+ KafkaSinglePortInputOperator input = dag.addOperator("MessageReader", new KafkaSinglePortInputOperator());
+ ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());
+ dag.addStream("MessageData", input.outputPort, output.input);
+ }
}
```
Below is the configuration for \u201ctest\u201d Kafka topic name and
@@ -399,13 +403,13 @@ Below is the configuration for \u201ctest\u201d Kafka topic name and
```xml
<property>
-<name>dt.operator.MessageReader.prop.topics</name>
-<value>test</value>
+ <name>dt.operator.MessageReader.prop.topics</name>
+ <value>test</value>
</property>
<property>
-<name>dt.operator.KafkaInputOperator.prop.clusters</nam>
-<value>localhost:9092</value>
+ <name>dt.operator.KafkaInputOperator.prop.clusters</nam>
+ <value>localhost:9092</value>
</property>
```
@@ -413,14 +417,14 @@ By adding following lines to properties file, Kafka Input Operator supports mult
```xml
<property>
-<name>dt.operator.MessageReader.prop.topics</name>
-<value>test1, test2</value>
+ <name>dt.operator.MessageReader.prop.topics</name>
+ <value>test1, test2</value>
</property>
<property>
-<name>dt.operator.KafkaInputOperator.prop.clusters</nam>
-<value>localhost:9092; localhost:9093; localhost:9094</value>
+ <name>dt.operator.KafkaInputOperator.prop.clusters</nam>
+ <value>localhost:9092; localhost:9093; localhost:9094</value>
</property>
```
-For more details about example application, Please refer to https://github.com/DataTorrent/examples/tree/master/tutorials/kafka.
+For a full example application project, refer to https://github.com/DataTorrent/examples/tree/master/tutorials/kafka