You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/09/28 21:28:53 UTC

samza git commit: Updated doc for Kinesis Connector

Repository: samza
Updated Branches:
  refs/heads/master 1e0c81b60 -> 11471672d


Updated doc for Kinesis Connector

Author: Aditya Toomula <at...@linkedin.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #667 from atoomula/kinesisdocs


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/11471672
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/11471672
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/11471672

Branch: refs/heads/master
Commit: 11471672d618485118d843aaefdea367f07a8dec
Parents: 1e0c81b
Author: Aditya Toomula <at...@linkedin.com>
Authored: Fri Sep 28 14:28:50 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri Sep 28 14:28:50 2018 -0700

----------------------------------------------------------------------
 .../documentation/versioned/aws/kinesis.md      | 104 +++++++++++--------
 docs/learn/documentation/versioned/index.html   |   2 +-
 2 files changed, 63 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/11471672/docs/learn/documentation/versioned/aws/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/aws/kinesis.md b/docs/learn/documentation/versioned/aws/kinesis.md
index a4be3dd..a866484 100644
--- a/docs/learn/documentation/versioned/aws/kinesis.md
+++ b/docs/learn/documentation/versioned/aws/kinesis.md
@@ -1,6 +1,6 @@
 ---
 layout: page
-title: Connecting to Kinesis
+title: Kinesis Connector
 ---
 <!--
    Licensed to the Apache Software Foundation (ASF) under one or more
@@ -19,86 +19,106 @@ title: Connecting to Kinesis
    limitations under the License.
 -->
 
-You can configure your Samza jobs to process data from [AWS Kinesis](https://aws.amazon.com/kinesis/data-streams), Amazon's data streaming service. A `Kinesis data stream` is similar to a Kafka topic and can have multiple partitions. Each message consumed from a Kinesis data stream is an instance of [Record](http://docs.aws.amazon.com/goto/WebAPI/kinesis-2013-12-02/Record).
+## Overview
 
-### Consuming from Kinesis:
+The Samza Kinesis connector provides access to [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams),
+Amazon’s data streaming service. A Kinesis Data Stream is similar to a Kafka topic and can have multiple partitions.
+Each message consumed from a Kinesis Data Stream is an instance of [Record](http://docs.aws.amazon.com/goto/WebAPI/kinesis-2013-12-02/Record).
+Samza’s [KinesisSystemConsumer](https://github.com/apache/samza/blob/master/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java)
+wraps the Record into a [KinesisIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java).
 
-Samza's [KinesisSystemConsumer](https://github.com/apache/samza/blob/master/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java) wraps the Record into a [KinesisIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java). The key of the message is set to partition key of the Record. The message is obtained from the Record body.
+## Consuming from Kinesis
 
-To configure Samza to consume from Kinesis streams:
+### Basic Configuration
+
+You can configure your Samza jobs to process data from Kinesis Streams. To configure Samza job to consume from Kinesis
+streams, please add the below configuration:
 
 {% highlight jproperties %}
-# define a kinesis system factory with your identifier. eg: kinesis-system
+// define a kinesis system factory with your identifier. eg: kinesis-system
 systems.kinesis-system.samza.factory=org.apache.samza.system.eventhub.KinesisSystemFactory
 
-# kinesis system consumer works with only AllSspToSingleTaskGrouperFactory
+// kinesis system consumer works with only AllSspToSingleTaskGrouperFactory
 job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
 
-# define your streams
+// define your streams
 task.inputs=kinesis-system.input0
 
-# define required properties for your streams
+// define required properties for your streams
 systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION
 systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY
 sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
 {% endhighlight %}
 
-The tuple required to access the Kinesis data stream must be provided, namely the fields `YOUR-STREAM-REGION`, `YOUR-ACCESS-KEY`, `YOUR-SECRET-KEY`.
+The tuple required to access the Kinesis data stream must be provided, namely the following fields:<br>
+**YOUR-STREAM-REGION**, **YOUR-ACCESS-KEY**, **YOUR-SECRET-KEY**.
+
 
-#### Advanced Configuration:
+### Advanced Configuration
 
-##### AWS Client Configs:
+#### AWS Client configs
+You can configure any [AWS client config](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html)
+with the prefix **systems.system-name.aws.clientConfig.***
 
-You can configure any [AWS client config](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) with the prefix `system.system-name.aws.clientConfig.*`
 {% highlight jproperties %}
-system.system-name.aws.clientConfig.CONFIG-NAME=CONFIG-VALUE
+systems.system-name.aws.clientConfig.CONFIG-PARAM=CONFIG-VALUE
 {% endhighlight %}
 
-As an example, to set a proxy host and proxy port for the AWS Client:
+As an example, to set a *proxy host* and *proxy port* for the AWS Client:
+
 {% highlight jproperties %}
 systems.system-name.aws.clientConfig.ProxyHost=my-proxy-host.com
 systems.system-name.aws.clientConfig.ProxyPort=my-proxy-port
 {% endhighlight %}
 
-##### KCL Configs:
+#### Kinesis Client Library Configs
+Samza Kinesis Connector uses [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html#kinesis-record-processor-overview-kcl)
+(KCL) to access the Kinesis data streams. You can set any [Kinesis Client Lib Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java)
+for a stream by configuring it under **systems.system-name.streams.stream-name.aws.kcl.***
 
-Similarly, you can set any [Kinesis Client Library config](https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java) for a stream by configuring it under `systems.system-name.streams.stream-name.aws.kcl.*`
 {% highlight jproperties %}
-systems.system-name.streams.stream-name.aws.kcl.CONFIG-NAME=CONFIG-VALUE
+systems.system-name.streams.stream-name.aws.kcl.CONFIG-PARAM=CONFIG-VALUE
 {% endhighlight %}
 
-As an example, to reset the checkpoint and set the starting position for a stream:
+Obtain the config param from the public functions in [Kinesis Client Lib Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java)
+by removing the *"with"* prefix. For example: config param corresponding to **withTableName()** is **TableName**.
+
+### Resetting Offsets
+
+The source of truth for checkpointing while using Kinesis Connector is not the Samza checkpoint topic but Kinesis itself.
+The Kinesis Client Library (KCL) [uses DynamoDB](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-ddb.html)
+to store it’s checkpoints. By default, Kinesis Connector reads from the latest offset in the stream.
+
+To reset the checkpoints and consume from earliest/latest offset of a Kinesis data stream, please change the KCL TableName
+and set the appropriate starting position for the stream as shown below.
+
 {% highlight jproperties %}
+// change the TableName to a unique name to reset checkpoint.
 systems.kinesis-system.streams.input0.aws.kcl.TableName=my-app-table-name
-# set the starting position to either TRIM_HORIZON (oldest) or LATEST (latest)
+// set the starting position to either TRIM_HORIZON (oldest) or LATEST (latest)
 systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream=my-start-position
 {% endhighlight %}
 
-#### Limitations
+To manipulate checkpoints to start from a particular position in the Kinesis stream, in lieu of Samza CheckpointTool,
+please login to the AWS Console and change the offsets in the DynamoDB Table with the table name that you have specified
+in the config above. By default, the table name has the following format:
+"\<job name\>-\<job id\>-\<kinesis stream\>".
 
-The following limitations apply for Samza jobs consuming from Kinesis streams using the Samza consumer:
-* Stateful processing (eg: windows or joins) is not supported on Kinesis streams. However, you can accomplish this by chaining two Samza jobs where the first job reads from Kinesis and sends to Kafka while the second job processes the data from Kafka.
-* Kinesis streams cannot be configured as [bootstrap](https://samza.apache.org/learn/documentation/latest/container/streams.html) or [broadcast](https://samza.apache.org/learn/documentation/latest/container/samza-container.html) streams.
-* Kinesis streams must be used with the [AllSspToSingleTaskGrouperFactory](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java). No other grouper is supported.
-* A Samza job that consumes from Kinesis cannot consume from any other input source. However, you can send your results to any destination (eg: Kafka, EventHubs), and have another Samza job consume them.
+### Known Limitations
 
-### Producing to Kinesis:
+The following limitations apply to Samza jobs consuming from Kinesis streams using the Samza consumer:
 
-The KinesisSystemProducer for Samza is not yet implemented.
-
-### How to configure Samza job to consume from Kinesis data stream ?
-
-This tutorial uses [hello samza](../../../startup/hello-samza/{{site.version}}/) to illustrate running a Samza job on Yarn that consumes from Kinesis. We will use the [KinesisHelloSamza](https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java) example.
-
-#### Update properties file
+- Stateful processing (eg: windows or joins) is not supported on Kinesis streams. However, you can accomplish this by
+chaining two Samza jobs where the first job reads from Kinesis and sends to Kafka while the second job processes the
+data from Kafka.
+- Kinesis streams cannot be configured as [bootstrap](https://samza.apache.org/learn/documentation/latest/container/streams.html)
+or [broadcast](https://samza.apache.org/learn/documentation/latest/container/samza-container.html) streams.
+- Kinesis streams must be used ONLY with the [AllSspToSingleTaskGrouperFactory](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java)
+as the Kinesis consumer does the partition management by itself. No other grouper is supported.
+- A Samza job that consumes from Kinesis cannot consume from any other input source. However, you can send your results
+to any destination (eg: Kafka, EventHubs), and have another Samza job consume them.
 
-Update the following properties in the kinesis-hello-samza.properties file:
+## Producing to Kinesis
 
-{% highlight jproperties %}
-task.inputs=kinesis.<kinesis-stream>
-systems.kinesis.streams.<kinesis-stream>.aws.region=<kinesis-stream-region>
-systems.kinesis.streams.<kinesis-stream>.aws.accessKey=<your-access-key>
-sensitive.systems.kinesis.streams.<kinesis-stream>.aws.region=<your-secret-key>
-{% endhighlight %}
+The KinesisSystemProducer for Samza is not yet implemented.
 
-Now, you are ready to run your Samza application on Yarn as described [here](../../../startup/hello-samza/{{site.version}}/). Check the log file for messages read from your Kinesis stream.

http://git-wip-us.apache.org/repos/asf/samza/blob/11471672/docs/learn/documentation/versioned/index.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/index.html b/docs/learn/documentation/versioned/index.html
index d6b64e1..193297c 100644
--- a/docs/learn/documentation/versioned/index.html
+++ b/docs/learn/documentation/versioned/index.html
@@ -53,7 +53,7 @@ title: Documentation
   <li><a href="jobs/configuration.html">Apache Kafka</a></li>
   <li><a href="jobs/packaging.html">Apache Hadoop</a></li>
   <li><a href="jobs/yarn-jobs.html">Azure EventHubs</a></li>
-  <li><a href="jobs/logging.html">AWS Kinesis</a></li>
+  <li><a href="aws/kinesis.html">AWS Kinesis</a></li>
 </ul>
 
 <h4>Operations</h4>