You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/10/30 19:11:58 UTC

samza git commit: atoomula and prateekm FYI..

Repository: samza
Updated Branches:
  refs/heads/master dcd4b558a -> de2b97f89


atoomula and prateekm FYI..

Author: Jagadish <jv...@linkedin.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #785 from vjagadish1989/website-reorg28


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/de2b97f8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/de2b97f8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/de2b97f8

Branch: refs/heads/master
Commit: de2b97f8976651946a2d1220259798e459d23a53
Parents: dcd4b55
Author: Jagadish <jv...@linkedin.com>
Authored: Tue Oct 30 12:11:56 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Oct 30 12:11:56 2018 -0700

----------------------------------------------------------------------
 .../versioned/connectors/kinesis.md             | 104 +++++++++++--------
 1 file changed, 61 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/de2b97f8/docs/learn/documentation/versioned/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/connectors/kinesis.md b/docs/learn/documentation/versioned/connectors/kinesis.md
index a866484..85149f6 100644
--- a/docs/learn/documentation/versioned/connectors/kinesis.md
+++ b/docs/learn/documentation/versioned/connectors/kinesis.md
@@ -19,11 +19,16 @@ title: Kinesis Connector
    limitations under the License.
 -->
 
-## Overview
+## Kinesis I/O: Quickstart
 
-The Samza Kinesis connector provides access to [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams),
-Amazon’s data streaming service. A Kinesis Data Stream is similar to a Kafka topic and can have multiple partitions.
-Each message consumed from a Kinesis Data Stream is an instance of [Record](http://docs.aws.amazon.com/goto/WebAPI/kinesis-2013-12-02/Record).
+The Samza Kinesis connector allows you to interact with [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams),
+Amazon’s data streaming service. The `hello-samza` project includes an example of processing Kinesis streams using Samza. Here is the complete [source code](https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java) and [configs](https://github.com/apache/samza-hello-samza/blob/master/src/main/config/kinesis-hello-samza.properties).
+You can build and run this example using this [tutorial](https://github.com/apache/samza-hello-samza#hello-samza).
+
+
+###Data Format
+Like a Kafka topic, a Kinesis stream can have multiple shards with producers and consumers.
+Each message consumed from the stream is an instance of a Kinesis [Record](http://docs.aws.amazon.com/goto/WebAPI/kinesis-2013-12-02/Record).
 Samza’s [KinesisSystemConsumer](https://github.com/apache/samza/blob/master/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java)
 wraps the Record into a [KinesisIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java).
 
@@ -31,90 +36,103 @@ wraps the Record into a [KinesisIncomingMessageEnvelope](https://github.com/apac
 
 ### Basic Configuration
 
-You can configure your Samza jobs to process data from Kinesis Streams. To configure Samza job to consume from Kinesis
-streams, please add the below configuration:
+Here is the required configuration for consuming messages from Kinesis. 
 
 {% highlight jproperties %}
-// define a kinesis system factory with your identifier. eg: kinesis-system
-systems.kinesis-system.samza.factory=org.apache.samza.system.eventhub.KinesisSystemFactory
+// Define a Kinesis system factory with your identifier. eg: kinesis-system
+systems.kinesis-system.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory
 
-// kinesis system consumer works with only AllSspToSingleTaskGrouperFactory
+// Kinesis consumer works with only AllSspToSingleTaskGrouperFactory
 job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
 
-// define your streams
+// Define your streams
 task.inputs=kinesis-system.input0
 
-// define required properties for your streams
+// Define required properties for your streams
 systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION
 systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY
 sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
 {% endhighlight %}
 
-The tuple required to access the Kinesis data stream must be provided, namely the following fields:<br>
-**YOUR-STREAM-REGION**, **YOUR-ACCESS-KEY**, **YOUR-SECRET-KEY**.
+####Coordination
+The Kinesis system consumer does not rely on Samza's coordination mechanism. Instead, it uses the Kinesis client library (KCL) for coordination and distributing available shards among available instances. Hence, you should
+set your `grouper` configuration to `AllSspToSingleTaskGrouperFactory`.
 
+{% highlight jproperties %}
+job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
+{% endhighlight %}
 
-### Advanced Configuration
+####Security
 
-#### AWS Client configs
-You can configure any [AWS client config](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html)
-with the prefix **systems.system-name.aws.clientConfig.***
+Each Kinesis stream in a given AWS [region](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html) can be accessed by providing an [access key](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys). An Access key consists of two parts: an access key ID (for example, `AKIAIOSFODNN7EXAMPLE`) and a secret access key (for example, `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`) which you can use to send programmatic requests to AWS. 
 
 {% highlight jproperties %}
-systems.system-name.aws.clientConfig.CONFIG-PARAM=CONFIG-VALUE
+systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION
+systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY
+sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY
 {% endhighlight %}
 
-As an example, to set a *proxy host* and *proxy port* for the AWS Client:
+### Advanced Configuration
+
+#### Kinesis Client Library Configs
+Samza Kinesis Connector uses the [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html#kinesis-record-processor-overview-kcl)
+(KCL) to access the Kinesis data streams. You can set any [KCL Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java)
+for a stream by configuring it with the **systems.system-name.streams.stream-name.aws.kcl.*** prefix.
 
 {% highlight jproperties %}
-systems.system-name.aws.clientConfig.ProxyHost=my-proxy-host.com
-systems.system-name.aws.clientConfig.ProxyPort=my-proxy-port
+systems.system-name.streams.stream-name.aws.kcl.CONFIG-PARAM=CONFIG-VALUE
 {% endhighlight %}
 
-#### Kinesis Client Library Configs
-Samza Kinesis Connector uses [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html#kinesis-record-processor-overview-kcl)
-(KCL) to access the Kinesis data streams. You can set any [Kinesis Client Lib Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java)
-for a stream by configuring it under **systems.system-name.streams.stream-name.aws.kcl.***
+As an example, the below configuration is equivalent to invoking `kclClient#WithTableName(myTable)` on the KCL instance.
+{% highlight jproperties %}
+systems.system-name.streams.stream-name.aws.kcl.TableName=myTable
+{% endhighlight %}
+
+#### AWS Client configs
+Samza allows you to specify any [AWS client configs](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) to connect to your Kinesis instance.
+You can configure any [AWS client configuration](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) with the `systems.your-system-name.aws.clientConfig.*` prefix.
 
 {% highlight jproperties %}
-systems.system-name.streams.stream-name.aws.kcl.CONFIG-PARAM=CONFIG-VALUE
+systems.system-name.aws.clientConfig.CONFIG-PARAM=CONFIG-VALUE
 {% endhighlight %}
 
-Obtain the config param from the public functions in [Kinesis Client Lib Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java)
-by removing the *"with"* prefix. For example: config param corresponding to **withTableName()** is **TableName**.
+As an example, to set the *proxy host* and *proxy port* to be used by the Kinesis Client:
+{% highlight jproperties %}
+systems.system-name.aws.clientConfig.ProxyHost=my-proxy-host.com
+systems.system-name.aws.clientConfig.ProxyPort=my-proxy-port
+{% endhighlight %}
 
 ### Resetting Offsets
 
-The source of truth for checkpointing while using Kinesis Connector is not the Samza checkpoint topic but Kinesis itself.
-The Kinesis Client Library (KCL) [uses DynamoDB](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-ddb.html)
-to store it’s checkpoints. By default, Kinesis Connector reads from the latest offset in the stream.
-
-To reset the checkpoints and consume from earliest/latest offset of a Kinesis data stream, please change the KCL TableName
-and set the appropriate starting position for the stream as shown below.
+Unlike other connectors where Samza stores and manages checkpointed offsets, Kinesis checkpoints are stored in a [DynamoDB](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-ddb.html) table.
+These checkpoints are stored and managed by the KCL library internally. You can reset the checkpoints by configuring a different name for the DynamoDB table. 
 
 {% highlight jproperties %}
-// change the TableName to a unique name to reset checkpoint.
+// change the TableName to a unique name to reset checkpoints.
 systems.kinesis-system.streams.input0.aws.kcl.TableName=my-app-table-name
+{% endhighlight %}
+
+When you reset checkpoints, you can configure your job to start consuming from either the earliest or latest offset in the stream.  
+
+{% highlight jproperties %}
 // set the starting position to either TRIM_HORIZON (oldest) or LATEST (latest)
-systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream=my-start-position
+systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream=LATEST
 {% endhighlight %}
 
-To manipulate checkpoints to start from a particular position in the Kinesis stream, in lieu of Samza CheckpointTool,
-please login to the AWS Console and change the offsets in the DynamoDB Table with the table name that you have specified
-in the config above. By default, the table name has the following format:
-"\<job name\>-\<job id\>-\<kinesis stream\>".
+Alternately, if you want to start from a particular offset in the Kinesis stream, you can login to the [AWS console](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) and edit the offsets in your DynamoDB Table.
+By default, the table-name has the following format: "\<job name\>-\<job id\>-\<kinesis stream\>".
 
 ### Known Limitations
 
-The following limitations apply to Samza jobs consuming from Kinesis streams using the Samza consumer:
+The following limitations apply to Samza jobs consuming from Kinesis streams :
 
 - Stateful processing (eg: windows or joins) is not supported on Kinesis streams. However, you can accomplish this by
 chaining two Samza jobs where the first job reads from Kinesis and sends to Kafka while the second job processes the
 data from Kafka.
 - Kinesis streams cannot be configured as [bootstrap](https://samza.apache.org/learn/documentation/latest/container/streams.html)
 or [broadcast](https://samza.apache.org/learn/documentation/latest/container/samza-container.html) streams.
-- Kinesis streams must be used ONLY with the [AllSspToSingleTaskGrouperFactory](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java)
-as the Kinesis consumer does the partition management by itself. No other grouper is supported.
+- Kinesis streams must be used only with the [AllSspToSingleTaskGrouperFactory](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java)
+as the Kinesis consumer does the partition management by itself. No other grouper is currently supported.
 - A Samza job that consumes from Kinesis cannot consume from any other input source. However, you can send your results
 to any destination (eg: Kafka, EventHubs), and have another Samza job consume them.