You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/11/29 19:36:40 UTC

[1/8] samza git commit: Fixing broken link between Yarn Host Affinity and Resource Localizati…

Repository: samza
Updated Branches:
  refs/heads/0.14.0 fae73b2c8 -> 04ee7fee6


Fixing broken link between Yarn Host Affinity and Resource Localizati…

Fixing broken link between Yarn Host Affinity and Resource Localization pages under Documentation

Patch needs to be back-ported to 0.13.0 website!

Author: navina <na...@apache.org>

Reviewers: Jake Maes <jm...@gmail.com>

Closes #302 from navina/website-link-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a5a69c4d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a5a69c4d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a5a69c4d

Branch: refs/heads/0.14.0
Commit: a5a69c4df6ea5ba179240d90086dd7ac539d5717
Parents: a322972
Author: navina <na...@apache.org>
Authored: Mon Nov 27 13:49:35 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Mon Nov 27 13:49:35 2017 -0800

----------------------------------------------------------------------
 docs/learn/documentation/versioned/yarn/yarn-host-affinity.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a5a69c4d/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md b/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md
index fea9522..f5edab7 100644
--- a/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md
+++ b/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md
@@ -119,4 +119,4 @@ As you have observed, host-affinity cannot be guaranteed all the time due to var
 1. _When the number of containers and/or container-task assignment changes across successive application runs_ - We may be able to re-use local state for a subset of partitions. Currently, there is no logic in the Job Coordinator to handle partitioning of tasks among containers intelligently. Handling this is more involved as relates to [auto-scaling](https://issues.apache.org/jira/browse/SAMZA-336) of the containers. However, with [task-container mapping](https://issues.apache.org/jira/browse/SAMZA-906), this will work better for typical container count adjustments.
 2. _When SystemStreamPartitionGrouper changes across successive application runs_ - When the grouper logic used to distribute the partitions across containers changes, the data in the Coordinator Stream (for changelog-task partition assignment etc) and the data stores becomes invalid. Thus, to be safe, we should flush out all state-related data from the Coordinator Stream. An alternative is to overwrite the Task-ChangelogPartition assignment message and the Container Locality message in the Coordinator Stream, before starting up the job again.
 
-## [Resource Localization &raquo;](../operations/resource-localization.html)
\ No newline at end of file
+## [Resource Localization &raquo;](../yarn/yarn-resource-localization.html)
\ No newline at end of file


[5/8] samza git commit: SAMZA-1412 replace mockito-all with mockito-core

Posted by xi...@apache.org.
SAMZA-1412 replace mockito-all with mockito-core

ran "./gradlew clean check" and all tests passed

Author: Fred Ji <ha...@gmail.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>

Closes #365 from fredji97/mockito-core


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d262f666
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d262f666
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d262f666

Branch: refs/heads/0.14.0
Commit: d262f66635c39cb71a570d8438f48cdf304b07a5
Parents: 9961023
Author: Fred Ji <ha...@gmail.com>
Authored: Tue Nov 28 14:36:24 2017 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Nov 28 14:36:24 2017 -0800

----------------------------------------------------------------------
 build.gradle | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d262f666/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index eddb11c..50cc5e0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -136,7 +136,7 @@ project(':samza-api') {
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
     testCompile "junit:junit:$junitVersion"
-    testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile "org.mockito:mockito-core:$mockitoVersion"
   }
   checkstyle {
     configFile = new File(rootDir, "checkstyle/checkstyle.xml")
@@ -178,7 +178,7 @@ project(":samza-core_$scalaVersion") {
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     testCompile project(":samza-api").sourceSets.test.output
     testCompile "junit:junit:$junitVersion"
-    testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile "org.mockito:mockito-core:$mockitoVersion"
     testCompile "org.powermock:powermock-api-mockito:$powerMockVersion"
     testCompile "org.powermock:powermock-core:$powerMockVersion"
     testCompile "org.powermock:powermock-module-junit4:$powerMockVersion"
@@ -204,7 +204,7 @@ project(':samza-azure') {
     compile project(":samza-core_$scalaVersion")
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     testCompile "junit:junit:$junitVersion"
-    testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile "org.mockito:mockito-core:$mockitoVersion"
     testCompile "org.powermock:powermock-api-mockito:$powerMockVersion"
     testCompile "org.powermock:powermock-core:$powerMockVersion"
     testCompile "org.powermock:powermock-module-junit4:$powerMockVersion"
@@ -284,7 +284,7 @@ project(":samza-autoscaling_$scalaVersion") {
     }
     compile "org.apache.httpcomponents:httpclient:$httpClientVersion"
     testCompile "junit:junit:$junitVersion"
-    testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile "org.mockito:mockito-core:$mockitoVersion"
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
   }
 
@@ -304,7 +304,7 @@ project(':samza-elasticsearch') {
     compile "com.google.guava:guava:$guavaVersion"
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     testCompile "junit:junit:$junitVersion"
-    testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile "org.mockito:mockito-core:$mockitoVersion"
 
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
   }
@@ -322,7 +322,7 @@ project(':samza-sql') {
 
     testCompile project(":samza-test_$scalaVersion")
     testCompile "junit:junit:$junitVersion"
-    testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile "org.mockito:mockito-core:$mockitoVersion"
 
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
   }
@@ -359,7 +359,7 @@ project(":samza-kafka_$scalaVersion") {
     testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
     testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test"
     testCompile "junit:junit:$junitVersion"
-    testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile "org.mockito:mockito-core:$mockitoVersion"
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
     testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
 
@@ -450,7 +450,7 @@ project(":samza-yarn_$scalaVersion") {
     compile "joda-time:joda-time:$jodaTimeVersion"
     compile "org.apache.zookeeper:zookeeper:$zookeeperVersion"
     testCompile "junit:junit:$junitVersion"
-    testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile "org.mockito:mockito-core:$mockitoVersion"
     testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
   }
@@ -553,7 +553,7 @@ project(":samza-kv_$scalaVersion") {
     compile project(":samza-core_$scalaVersion")
     compile "org.scala-lang:scala-library:$scalaLibVersion"
     testCompile "junit:junit:$junitVersion"
-    testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile "org.mockito:mockito-core:$mockitoVersion"
   }
 
   checkstyle {
@@ -658,7 +658,7 @@ project(":samza-rest") {
 
     testCompile "junit:junit:$junitVersion"
     testCompile "org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:$jerseyVersion"
-    testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile "org.mockito:mockito-core:$mockitoVersion"
   }
 
   tasks.create(name: "releaseRestServiceTar", type: Tar) {
@@ -731,7 +731,7 @@ project(":samza-test_$scalaVersion") {
     testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test"
     testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
     testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
-    testCompile "org.mockito:mockito-all:$mockitoVersion"
+    testCompile "org.mockito:mockito-core:$mockitoVersion"
     testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
   }
 


[2/8] samza git commit: SAMZA-1513; Doc updates for persistent windows, joins and serdes.

Posted by xi...@apache.org.
SAMZA-1513; Doc updates for persistent windows, joins and serdes.

jmakes prateekm nickpan47 for review.

Author: Jagadish <jv...@linkedin.com>

Reviewers: Jake Maes<jm...@linkedin.com>

Closes #369 from vjagadish1989/doc-updates


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5e68d621
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5e68d621
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5e68d621

Branch: refs/heads/0.14.0
Commit: 5e68d621aa841feaf1650be64ac723c12b996df5
Parents: a5a69c4
Author: Jagadish <jv...@linkedin.com>
Authored: Tue Nov 28 12:30:35 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 28 12:30:35 2017 -0800

----------------------------------------------------------------------
 .../versioned/hello-samza-high-level-code.md    | 87 ++++++++++----------
 docs/startup/preview/index.md                   | 67 ++++++++-------
 2 files changed, 80 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5e68d621/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
----------------------------------------------------------------------
diff --git a/docs/learn/tutorials/versioned/hello-samza-high-level-code.md b/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
index 6c0526e..2f6a4a6 100644
--- a/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
+++ b/docs/learn/tutorials/versioned/hello-samza-high-level-code.md
@@ -108,26 +108,13 @@ systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
 systems.kafka.consumer.zookeeper.connect=localhost:2181/
 systems.kafka.producer.bootstrap.servers=localhost:9092
 systems.kafka.default.stream.replication.factor=1
-systems.kafka.default.stream.samza.msg.serde=json
 {% endhighlight %}
 
 The above configuration defines 2 systems; one called _wikipedia_ and one called _kafka_.
 
 A factory is required for each system, so the _systems.system-name.samza.system.factory_ property is required for both systems. The other properties are system and use-case specific.
 
-For the _kafka_ system, we set the default replication factor to 1 for all streams because this application is intended for a demo deployment which utilizes a Kafka cluster with only 1 broker, so a replication factor larger than 1 is invalid. The default serde is JSON, which means by default any streams consumed or produced to the _kafka_ system will use a _json_ serde, which we will define in the next section.
-
-The _wikipedia_ system does not need a serde because the `WikipediaConsumer` already produces a usable type.
-
-#### Serdes
-Next, we need to configure the [serdes](/learn/documentation/{{site.version}}/container/serialization.html) we will use for streams and stores in the application.
-{% highlight bash %}
-serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
-{% endhighlight %}
-
-The _json_ serde was used for the _kafka_ system above. The _string_ and _integer_ serdes will be used later.
+For the _kafka_ system, we set the default replication factor to 1 for all streams because this application is intended for a demo deployment which utilizes a Kafka cluster with only 1 broker, so a replication factor larger than 1 is invalid.
 
 #### Configure Streams
 Samza identifies streams using a unique stream ID. In most cases, the stream ID is the same as the actual stream name. However, if a stream has a name that doesn't match the pattern `[A-Za-z0-9_-]+`, we need to configure a separate _physical.name_ to associate the actual stream name with a legal stream ID. The Wikipedia channels we will consume have a '#' character in the names. So for each of them we must pick a legal stream ID and then configure the physical name to match the channel.
@@ -208,16 +195,15 @@ Next, we will declare the input streams for the Wikipedia application.
 #### Inputs
 The Wikipedia application consumes events from three channels. Let's declare each of those channels as an input streams via the [StreamGraph](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/StreamGraph.html) in the [init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-) method.
 {% highlight java %}
-MessageStream<WikipediaFeedEvent> wikipediaEvents = streamGraph.getInputStream("en-wikipedia", (k, v) -> (WikipediaFeedEvent) v);
-MessageStream<WikipediaFeedEvent> wiktionaryEvents = streamGraph.getInputStream("en-wiktionary", (k, v) -> (WikipediaFeedEvent) v);
-MessageStream<WikipediaFeedEvent> wikiNewsEvents = streamGraph.getInputStream("en-wikinews", (k, v) -> (WikipediaFeedEvent) v);
+MessageStream<WikipediaFeedEvent> wikipediaEvents = streamGraph.getInputStream("en-wikipedia", new NoOpSerde<>());
+MessageStream<WikipediaFeedEvent> wiktionaryEvents = streamGraph.getInputStream("en-wiktionary", new NoOpSerde<>());
+MessageStream<WikipediaFeedEvent> wikiNewsEvents = streamGraph.getInputStream("en-wikinews", new NoOpSerde<>());
 {% endhighlight %}
 
-The first argument to the [getInputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/StreamGraph.html#getInputStream-java.lang.String-java.util.function.BiFunction-) method is the stream ID. Each ID must match the corresponding stream IDs we configured earlier.
-
-The second argument is the *message builder*. It converts the input key and message to the appropriate type. In this case, we don't have a key and want to sent the events as-is, so we have a very simple builder that just forwards the input value.
+The first argument to the [getInputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/StreamGraph.html#getInputStream-java.lang.String-org.apache.samza.serializers.Serde-) method is the stream ID. Each ID must match the corresponding stream IDs we configured earlier.
+The second argument is the `Serde` used to deserialize the message. We've set this to a `NoOpSerde` since our `wikipedia` system already returns `WikipediaFeedEvent`s and there is no need for further deserialization.
 
-Note the streams are all MessageStreams of type WikipediaFeedEvent. [MessageStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html) is the in-memory representation of a stream in Samza. It uses generics to ensure type safety across the streams and operations. We knew the WikipediaFeedEvent type by inspecting the WikipediaConsumer above and we made it explicit with the cast on the output of the MessageBuilder. If our inputs used a serde, we would know the type based on which serde is configured for the input streams.
+Note the streams are all MessageStreams of type WikipediaFeedEvent. [MessageStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html) is the in-memory representation of a stream in Samza. It uses generics to ensure type safety across the streams and operations.
 
 #### Merge
 We'd like to use the same processing logic for all three input streams, so we will use the [mergeAll](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-) operator to merge them together. Note: this is not the same as a [join](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#join-org.apache.samza.operators.MessageStream-org.apache.samza.operators.functions.JoinFunction-java.time.Duration-) because we are not associating events by key. We are simply combining three streams into one, like a union.
@@ -279,7 +265,7 @@ Note: the type parameters for [FoldLeftFunction](/learn/documentation/{{site.ver
 Finally, we can define our [window](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/Window.html) back in the [init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-) method by chaining the result of the parser:
 {% highlight java %}
 allWikipediaEvents.map(WikipediaParser::parseEvent)
-        .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator()));
+        .window(Windows.tumblingWindow(Duration.ofSeconds(10), WikipediaStats::new, new WikipediaStatsAggregator(), new JsonSerdeV2<>(WikipediaStats.class)));
 {% endhighlight %}
 
 This defines an unkeyed [tumbling window](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/Windows.html) that spans 10s, which instantiates a new `WikipediaStats` object at the beginning of each window and aggregates the stats using `WikipediaStatsAggregator`.
@@ -287,20 +273,32 @@ This defines an unkeyed [tumbling window](/learn/documentation/{{site.version}}/
 The output of the window is a [WindowPane](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/windows/WindowPane.html) with a key and value. Since we used an unkeyed tumbling window, the key is `Void`. The value is our `WikipediaStats` object.
 
 #### Output
-We want to use a JSON serializer to output the window values to Kafka, so we will do one more [map](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-) to format the output.
+We will do a [map](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-) at the end to format our window output. Let's begin by defining a simple container class for our formatted output.
 
-First, let's define the method to format the stats as a `Map<String, String>` so the _json_ serde can handle it. Paste the following after the aggregator class:
 {% highlight java %}
-private Map<String, Integer> formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
-  WikipediaStats stats = statsWindowPane.getMessage();
-
-  Map<String, Integer> counts = new HashMap<String, Integer>(stats.counts);
-  counts.put("edits", stats.edits);
-  counts.put("bytes-added", stats.byteDiff);
-  counts.put("unique-titles", stats.titles.size());
+  static class WikipediaStatsOutput {
+    public int edits;
+    public int bytesAdded;
+    public int uniqueTitles;
+    public Map<String, Integer> counts;
+
+    public WikipediaStatsOutput(int edits, int bytesAdded, int uniqueTitles,
+        Map<String, Integer> counts) {
+      this.edits = edits;
+      this.bytesAdded = bytesAdded;
+      this.uniqueTitles = uniqueTitles;
+      this.counts = counts;
+    }
+  }
+{% endhighlight %}
 
-  return counts;
-}
+Paste the following after the aggregator class:
+{% highlight java %}
+  private WikipediaStatsOutput formatOutput(WindowPane<Void, WikipediaStats> statsWindowPane) {
+    WikipediaStats stats = statsWindowPane.getMessage();
+    return new WikipediaStatsOutput(
+        stats.edits, stats.byteDiff, stats.titles.size(), stats.counts);
+  }
 {% endhighlight %}
 
 Now, we can invoke the method by adding another [map](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-) operation to the chain in [init](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/application/StreamApplication.html#init-org.apache.samza.operators.StreamGraph-org.apache.samza.config.Config-). The operator chain should now look like this:
@@ -312,15 +310,14 @@ allWikipediaEvents.map(WikipediaParser::parseEvent)
 
 Next we need to get the output stream to which we will send the stats. Insert the following line below the creation of the 3 input streams:
 {% highlight java %}
-OutputStream<Void, Map<String, Integer>, Map<String, Integer>>
-        wikipediaStats = streamGraph.getOutputStream("wikipedia-stats", m -> null, m -> m);
+  OutputStream<WikipediaStatsOutput> wikipediaStats =
+     graph.getOutputStream("wikipedia-stats", new JsonSerdeV2<>(WikipediaStatsOutput.class));
 {% endhighlight %}
 
-The [OutputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/OutputStream.html) is parameterized by 3 types; the key type for the output, the value type for the output, and upstream type.
-
-The first parameter of [getOutputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/StreamGraph.html#getOutputStream-java.lang.String-java.util.function.Function-java.util.function.Function-) is the output stream ID. We will use _wikipedia-stats_ and since it contains no special characters, we won't bother configuring a physical name so Samza will use the stream ID as the topic name.
+The [OutputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/OutputStream.html) is parameterized by the type of the output.
 
-The second and third parameters are the *key extractor* and the *message extractor*, respectively. We have no key, so the *key extractor* simply produces null. The *message extractor* simply passes the message because it's already the correct type for the _json_ serde. Note: we could have skipped the previous [map](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#map-org.apache.samza.operators.functions.MapFunction-) operator and invoked our formatter here, but we kept them separate for pedagogical purposes.
+The first parameter of [getOutputStream](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/StreamGraph.html#getOutputStream-java.lang.String-org.apache.samza.serializers.Serde-) is the output stream ID. We will use _wikipedia-stats_ and since it contains no special characters, we won't bother configuring a physical name so Samza will use the stream ID as the topic name.
+The second parameter is the [Serde](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/serializers/Serde.html) to serialize the outgoing message. We will set it to [JsonSerdeV2](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/serializers/JsonSerdeV2.html) to serialize our `WikipediaStatsOutput` as a JSON string.
 
 Finally, we can send our output to the output stream using the [sendTo](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/MessageStream.html#sendTo-org.apache.samza.operators.OutputStream-) operator:
 {% highlight java %}
@@ -339,15 +336,18 @@ We will do this by keeping a separate count outside the window and persisting it
 
 We start by defining the store in the config file:
 {% highlight bash %}
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory
+
 stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
 stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
 stores.wikipedia-stats.key.serde=string
 stores.wikipedia-stats.msg.serde=integer
 {% endhighlight %}
 
-These properties declare a [RocksDB](http://rocksdb.org/) key-value store named "wikipedia-stats". The store is replicated to a changelog stream called "wikipedia-stats-changelog" on the _kafka_ system for durability. It uses the _string_ and _integer_ serdes you defined earlier for keys and values respectively.
+These properties declare a [RocksDB](http://rocksdb.org/) key-value store named "wikipedia-stats". The store is replicated to a changelog stream called "wikipedia-stats-changelog" on the _kafka_ system for durability. It uses the _string_ and _integer_ serdes for keys and values respectively.
 
-Next, we add a total count member variable to the `WikipediaStats` class:
+Next, we add a total count member variable to the `WikipediaStats` class, and to the `WikipediaStatsOutput` class:
 {% highlight java %}
 int totalEdits = 0;
 {% endhighlight %}
@@ -374,10 +374,7 @@ store.put("count-edits-all-time", editsAllTime);
 stats.totalEdits = editsAllTime;
 {% endhighlight %}
 
-Finally, update the `MyWikipediaApplication#formatOutput` method to include the total counter.
-{% highlight java %}
-counts.put("edits-all-time", stats.totalEdits);
-{% endhighlight %}
+Finally, update the `MyWikipediaApplication#formatOutput` method to include the total counter in its `WikipediaStatsOutput`.
 
 #### Metrics
 Lastly, let's add a metric to the application which counts the number of repeat edits each topic within the window interval.

http://git-wip-us.apache.org/repos/asf/samza/blob/5e68d621/docs/startup/preview/index.md
----------------------------------------------------------------------
diff --git a/docs/startup/preview/index.md b/docs/startup/preview/index.md
index b1ca0ac..ceccc39 100644
--- a/docs/startup/preview/index.md
+++ b/docs/startup/preview/index.md
@@ -127,10 +127,10 @@ Since the 0.13.0 release, Samza provides a new high level API that simplifies yo
 
 Check out some examples to see the high-level API in action.
 
-1.  [Pageview AdClick Joiner](https://github.com/apache/samza-hello-samza/blob/c87ed565fbaebf2ac88376143c65e9f52f7a8801/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java) demonstrates joining a stream of PageViews with a stream of AdClicks, e.g. to analyze which pages get the most ad clicks.
-2.  [Pageview Repartitioner](https://github.com/apache/samza-hello-samza/blob/c87ed565fbaebf2ac88376143c65e9f52f7a8801/src/main/java/samza/examples/cookbook/PageViewFilterApp.java) illustrates re-partitioning the incoming stream of PageViews.
-3.  [Pageview Sessionizer](https://github.com/apache/samza-hello-samza/blob/c87ed565fbaebf2ac88376143c65e9f52f7a8801/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java) groups the incoming stream of events into sessions based on user activity.
-4.  [Pageview by Region](https://github.com/apache/samza-hello-samza/blob/c87ed565fbaebf2ac88376143c65e9f52f7a8801/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java) counts the number of views per-region over tumbling time intervals.
+1.  [Pageview AdClick Joiner](https://github.com/apache/samza-hello-samza/blob/e5943a000eef87e077c422e09dc20f09d4e876ca/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java) demonstrates joining a stream of PageViews with a stream of AdClicks, e.g. to analyze which pages get the most ad clicks.
+2.  [Pageview Repartitioner](https://github.com/apache/samza-hello-samza/blob/e5943a000eef87e077c422e09dc20f09d4e876ca/src/main/java/samza/examples/cookbook/PageViewFilterApp.java) illustrates re-partitioning the incoming stream of PageViews.
+3.  [Pageview Sessionizer](https://github.com/apache/samza-hello-samza/blob/e5943a000eef87e077c422e09dc20f09d4e876ca/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java) groups the incoming stream of events into sessions based on user activity.
+4.  [Pageview by Region](https://github.com/apache/samza-hello-samza/blob/e5943a000eef87e077c422e09dc20f09d4e876ca/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java) counts the number of views per-region over tumbling time intervals.
 
 
 ## Key Concepts
@@ -147,11 +147,11 @@ For example, here is a StreamApplication that validates and decorates page views
 public class BadPageViewFilter implements StreamApplication {
   @Override
 public void init(StreamGraph graph, Config config) {
-    MessageStream<PageView> pageViews = graph.getInputStream(“page-views”..);
+    MessageStream<PageView> pageViews = graph.getInputStream(“page-views”, new JsonSerdeV2<>(PageView.class));
 
     pageViews.filter(this::isValidPageView)
                       .map(this::addProfileInformation)
-                      .sendTo(graph.getOutputStream(“decorated-page-views”..))
+                      .sendTo(graph.getOutputStream(“decorated-page-views”, new JsonSerdeV2<>(DecoratedPageView.class)))
  }
 }
 {% endhighlight %}
@@ -169,7 +169,7 @@ There are 3 simple steps to write your stream processing applications using the
 ### Step 1: Obtain the input streams:
 You can obtain the MessageStream for your input stream ID (“page-views”) using StreamGraph.getInputStream.
     {% highlight java %}
-    MessageStream<PageView> pageViewInput = graph.getInputStream(“page-views”, (k,v) -> v);
+    MessageStream<PageView> pageViewInput = graph.getInputStream(“page-views”, new JsonSerdeV2<>(PageView.class));
     {% endhighlight %}
 
 The first parameter `page-views` is the logical stream ID. Each stream ID is associated with a *physical name* and a *system*. By default, Samza uses the stream ID as the physical stream name and accesses the stream on the default system which is specified with the property “job.default.system”. However, the *physical name* and *system* properties can be overridden in configuration. For example, the following configuration defines the stream ID "page-views" as an alias for the PageViewEvent topic in a local Kafka cluster.
@@ -182,7 +182,7 @@ systems.kafka.producer.bootstrap.servers=localhost:9092
 streams.page-views.samza.physical.name=PageViewEvent
 {% endhighlight %}
 
-The second parameter `(k,v) -> v` is the MessageBuilder function that is used to construct a message from the incoming key and value.
+The second parameter is a serde to de-serialize the incoming message.
 
 ### Step 2: Define your transformation logic:
 You are now ready to define your StreamApplication logic as a series of transformations on MessageStreams.
@@ -200,8 +200,7 @@ Finally, you can create an OutputStream using StreamGraph.getOutputStream and se
 // Send messages with userId as the key to “decorated-page-views”.
 decoratedPageViews.sendTo(
                           graph.getOutputStream(“decorated-page-views”,
-                                                dpv -> dpv.getUserId(),
-                                                dpv -> dpv));
+                                                new JsonSerdeV2<>(DecoratedPageView.class)));
 {% endhighlight %}
 
 The first parameter `decorated-page-views` is a logical stream ID. The properties for this stream ID can be overridden just like the stream IDs for input streams. For example:
@@ -210,7 +209,7 @@ streams.decorated-page-views.samza.system=kafka
 streams.decorated-page-views.samza.physical.name=DecoratedPageViewEvent
 {% endhighlight %}
 
-The second and third parameters define extractors to split the upstream data type into a separate key and value, respectively.
+The second parameter is a serde to de-serialize the outgoing message.
 
 ## Operators
 The high level API supports common operators like map, flatmap, filter, merge, joins, and windowing on streams. Most of these operators accept corresponding Functions and these functions are [Initable](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/InitableFunction.html).
@@ -247,12 +246,17 @@ MessageStream<String> shortWords = words.filter(word -> word.size() < 3);
 
 ### PartitionBy
 Re-partitions this MessageStream using the key returned by the provided keyExtractor and returns the transformed MessageStream. Messages are sent through an intermediate stream during repartitioning.
-
 {% highlight java %}
-// Repartition pageView by userId
 MessageStream<PageView> pageViews = ...
-MessageStream<PageView> partitionedPageViews =
-                                        pageViews.partitionBy(pageView -> pageView.getUserId())
+// Repartition pageView by userId.
+MessageStream<KV<String, PageView>> partitionedPageViews =
+                                        pageViews.partitionBy(pageView -> pageView.getUserId(), // key extractor
+                                        pageView -> pageView, // value extractor
+                                        KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class)), // serdes
+                                        "partitioned-page-views") // operator ID
+
+The operator ID should be unique for an operator within the application and is used to identify the streams and stores created by the operator.
+
 {% endhighlight %}
 
 ### Merge
@@ -275,11 +279,11 @@ Sends all messages from this MessageStream to the provided OutputStream. You can
 {% highlight java %}
 // Output a new message with userId as the key and region as the value to the “user-region” stream.
 MessageStream<PageView> pageViews = ...
-OutputStream<String, String, PageView> userRegions
+MessageStream<KV<String, PageView>> keyedPageViews = pageViews.map(KV.of(pageView.getUserId(), pageView.getRegion()));
+OutputStream<KV<String, String>> userRegions
                            = graph.getOutputStream(“user-region”,
-                                                   pageView -> pageView.getUserId(),
-                                                   pageView -> pageView.getRegion())
-pageView.sendTo(userRegions);
+                                                   KVSerde.of(new StringSerde(), new StringSerde()));
+keyedPageViews.sendTo(userRegions);
 {% endhighlight %}
 
 
@@ -308,7 +312,11 @@ The Join operator joins messages from two MessageStreams using the provided pair
 MessageStream<OrderRecord> orders = …
 MessageStream<ShipmentRecord> shipments = …
 
-MessageStream<FulfilledOrderRecord> shippedOrders = orders.join(shipments, new OrderShipmentJoiner(), Duration.ofMinutes(20) )
+MessageStream<FulfilledOrderRecord> shippedOrders = orders.join(shipments, new OrderShipmentJoiner(),
+    new StringSerde(), // serde for the join key
+    new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class), // serde for both streams
+    Duration.ofMinutes(20), // join TTL
+    "shipped-order-stream") // operator ID
 
 // Constructs a new FulfilledOrderRecord by extracting the order timestamp from the OrderRecord and the shipment timestamp from the ShipmentRecord.
  class OrderShipmentJoiner implements JoinFunction<String, OrderRecord, ShipmentRecord, FulfilledOrderRecord> {
@@ -358,16 +366,17 @@ Examples:
 MessageStream<PageView> pageViews = ...
 MessageStream<WindowPane<String, Collection<PageView>>> =
                      pageViews.window(
-                         Windows.keyedTumblingWindow(pageView -> pageView.getUserId(),
-                           Duration.ofSeconds(30)))
+                         Windows.keyedTumblingWindow(pageView -> pageView.getUserId(), // key extractor
+                           Duration.ofSeconds(30), // window duration
+                           new StringSerde(), new JsonSerdeV2<>(PageView.class)));
 
 
 // Compute the maximum value over tumbling windows of 3 seconds.
 MessageStream<Integer> integers = …
-Supplier<Integer> initialValue = () -> Integer.MIN_VALUE
-FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> Math.max(msg, oldValue)
+Supplier<Integer> initialValue = () -> Integer.MIN_VALUE;
+FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> Math.max(msg, oldValue);
 MessageStream<WindowPane<Void, Integer>> windowedStream =
-         integers.window(Windows.tumblingWindow(Duration.ofSeconds(30), initialValue, aggregateFunction))
+         integers.window(Windows.tumblingWindow(Duration.ofSeconds(30), initialValue, aggregateFunction, new IntegerSerde()));
 
 {% endhighlight %}
 
@@ -383,7 +392,8 @@ Supplier<Integer> initialValue = () -> 0
 FoldLeftFunction<PageView, Integer> countAggregator = (pageView, oldCount) -> oldCount + 1;
 Duration sessionGap = Duration.ofMinutes(3);
 MessageStream<WindowPane<String, Integer> sessionCounts = pageViews.window(Windows.keyedSessionWindow(
-    pageView -> pageView.getUserId(), sessionGap, initialValue, countAggregator));
+    pageView -> pageView.getUserId(), sessionGap, initialValue, countAggregator,
+        new StringSerde(), new IntegerSerde()));
 
 // Compute the maximum value over tumbling windows of 3 seconds.
 MessageStream<Integer> integers = …
@@ -391,12 +401,11 @@ Supplier<Integer> initialValue = () -> Integer.MAX_INT
 
 FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> Math.max(msg, oldValue)
 MessageStream<WindowPane<Void, Integer>> windowedStream =
-     integers.window(Windows.tumblingWindow(Duration.ofSeconds(3), initialValue, aggregateFunction))
+     integers.window(Windows.tumblingWindow(Duration.ofSeconds(3), initialValue, aggregateFunction,
+         new IntegerSerde()))
 
 {% endhighlight %}
 
-### Known Issues
-Currently, both window and join operators buffer messages in-memory. So, messages could be lost on failures and re-starts.
 
 ---
 


[7/8] samza git commit: SAMZA-1516: Another round of issues found by BEAM tests

Posted by xi...@apache.org.
SAMZA-1516: Another round of issues found by BEAM tests

A couple of more fixes: 1. fix a bug of identifying input streams for an operator. 2. for partitionBy, set the partitionKey to 0L when key is null.

Author: xiliu <xi...@xiliu-ld1.linkedin.biz>

Reviewers: Jagadish V <vj...@gmail.com>

Closes #370 from xinyuiscool/SAMZA-1516


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/12e61e98
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/12e61e98
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/12e61e98

Branch: refs/heads/0.14.0
Commit: 12e61e98c18a50cb04e368201d3a5797bc4eb7b0
Parents: 3b2a1fa
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Authored: Wed Nov 29 11:34:28 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Wed Nov 29 11:34:28 2017 -0800

----------------------------------------------------------------------
 .../samza/operators/impl/OperatorImplGraph.java       | 14 +++++++++++---
 .../samza/operators/impl/PartitionByOperatorImpl.java |  3 ++-
 2 files changed, 13 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/12e61e98/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 49b29c8..0bb12d2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -155,6 +155,7 @@ public class OperatorImplGraph {
    */
   OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
       SystemStream inputStream, Config config, TaskContext context) {
+
     if (!operatorImpls.containsKey(operatorSpec.getOpId()) || operatorSpec instanceof JoinOperatorSpec) {
       // Either this is the first time we've seen this operatorSpec, or this is a join operator spec
       // and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG.
@@ -174,9 +175,16 @@ public class OperatorImplGraph {
         });
       return operatorImpl;
     } else {
-      // the implementation corresponding to operatorSpec has already been instantiated
-      // and registered, so we do not need to traverse the DAG further.
-      return operatorImpls.get(operatorSpec.getOpId());
+      // the implementation corresponding to operatorSpec has already been instantiated and registered.
+      OperatorImpl operatorImpl = operatorImpls.get(operatorSpec.getOpId());
+      operatorImpl.registerInputStream(inputStream);
+
+      // We still need to traverse the DAG further to register the input streams.
+      Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs();
+      registeredSpecs.forEach(registeredSpec -> {
+          createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, config, context);
+        });
+      return operatorImpl;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/12e61e98/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
index 424c10f..b3fb4b2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@ -77,7 +77,8 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
       TaskCoordinator coordinator) {
     K key = keyFunction.apply(message);
     V value = valueFunction.apply(message);
-    collector.send(new OutgoingMessageEnvelope(systemStream, null, key, value));
+    Long partitionKey = key == null ? 0L : null;
+    collector.send(new OutgoingMessageEnvelope(systemStream, partitionKey, key, value));
     return Collections.emptyList();
   }
 


[6/8] samza git commit: SAMZA-1514: Prevent changelog stream names with empty strings.

Posted by xi...@apache.org.
SAMZA-1514: Prevent changelog stream names with empty strings.

This fix prevents changelog stream names with empty or whitespace-only strings.

Author: Daniel Nishimura <dn...@gmail.com>

Reviewers: Jacob Maes <jm...@linkedin.com>

Closes #371 from dnishimura/samza-1514-empty-string-changelog


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3b2a1fa4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3b2a1fa4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3b2a1fa4

Branch: refs/heads/0.14.0
Commit: 3b2a1fa4398dc26f9270c405c01edaac59ba9fc1
Parents: d262f66
Author: Daniel Nishimura <dn...@gmail.com>
Authored: Tue Nov 28 16:19:45 2017 -0800
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Tue Nov 28 16:19:45 2017 -0800

----------------------------------------------------------------------
 .../org/apache/samza/config/JavaStorageConfig.java     |  5 +++--
 .../org/apache/samza/config/TestJavaStorageConfig.java | 13 +++++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3b2a1fa4/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
index 34e5683..48beec9 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
@@ -21,6 +21,7 @@ package org.apache.samza.config;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.execution.StreamManager;
 
@@ -58,11 +59,11 @@ public class JavaStorageConfig extends MapConfig {
     // If the config specifies 'stores.<storename>.changelog' as '<system>.<stream>' combination - it will take precedence.
     // If this config only specifies <astream> and there is a value in job.changelog.system=<asystem> -
     // these values will be combined into <asystem>.<astream>
-    String systemStream = get(String.format(CHANGELOG_STREAM, storeName), null);
-    String changelogSystem = getChangelogSystem();
+    String systemStream = StringUtils.trimToNull(get(String.format(CHANGELOG_STREAM, storeName), null));
 
     String systemStreamRes;
     if (systemStream != null  && !systemStream.contains(".")) {
+      String changelogSystem = getChangelogSystem();
       // contains only stream name
       if (changelogSystem != null) {
         systemStreamRes = changelogSystem + "." + systemStream;

http://git-wip-us.apache.org/repos/asf/samza/blob/3b2a1fa4/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java
index cc80430..c04d14f 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java
@@ -68,4 +68,17 @@ public class TestJavaStorageConfig {
        // do nothing, it is expected
     }
   }
+
+  @Test
+  public void testEmptyStringOrNullChangelogStream() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("stores.store1.changelog", "");
+    configMap.put("stores.store2.changelog", " ");
+
+    JavaStorageConfig config = new JavaStorageConfig(new MapConfig(configMap));
+
+    assertNull(config.getChangelogStream("store1"));
+    assertNull(config.getChangelogStream("store2"));
+    assertNull(config.getChangelogStream("store-changelog-none"));
+  }
 }


[3/8] samza git commit: SAMZA-1515; Implement a consumer for Kinesis

Posted by xi...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java
new file mode 100644
index 0000000..5b3b335
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+public class TestKinesisSystemFactory {
+  private static final String SYSTEM_FACTORY_REGEX = "systems.%s.samza.factory";
+  private static final String KINESIS_SYSTEM_FACTORY = KinesisSystemFactory.class.getName();
+
+  @Test
+  public void testGetConsumer() {
+    String systemName = "test";
+    Config config = buildKinesisConsumerConfig(systemName);
+    KinesisSystemFactory factory = new KinesisSystemFactory();
+    MetricsRegistry metricsRegistry = new NoOpMetricsRegistry();
+    Assert.assertNotSame(factory.getConsumer("test", config, metricsRegistry), factory.getAdmin(systemName, config));
+  }
+
+  @Ignore
+  @Test(expected = ConfigException.class)
+  public void testGetAdminWithIncorrectSspGrouper() {
+    String systemName = "test";
+    KinesisSystemFactory factory = new KinesisSystemFactory();
+    Config config = buildKinesisConsumerConfig(systemName,
+        "org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory");
+    factory.getAdmin(systemName, config);
+  }
+
+  @Ignore
+  @Test(expected = ConfigException.class)
+  public void testGetAdminWithBroadcastStreams() {
+    String systemName = "test";
+    KinesisSystemFactory factory = new KinesisSystemFactory();
+    Config config = buildKinesisConsumerConfig(systemName,
+        "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory",
+        "test.stream#0");
+    factory.getAdmin(systemName, config);
+  }
+
+  @Ignore
+  @Test(expected = ConfigException.class)
+  public void testGetAdminWithBootstrapStream() {
+    String systemName = "test";
+    KinesisSystemFactory factory = new KinesisSystemFactory();
+    Config config = buildKinesisConsumerConfig(systemName,
+        "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory",
+        null,
+        "kinesis-stream"
+        );
+    factory.getAdmin(systemName, config);
+  }
+
+  private static Config buildKinesisConsumerConfig(String systemName) {
+    return buildKinesisConsumerConfig(systemName, AllSspToSingleTaskGrouperFactory.class.getCanonicalName());
+  }
+
+  private static Config buildKinesisConsumerConfig(String systemName, String sspGrouperFactory) {
+    return buildKinesisConsumerConfig(systemName, sspGrouperFactory, null);
+  }
+
+  private static Config buildKinesisConsumerConfig(String systemName, String sspGrouperFactory,
+      String broadcastStreamConfigValue) {
+    return buildKinesisConsumerConfig(systemName, sspGrouperFactory, broadcastStreamConfigValue, null);
+  }
+
+  private static Config buildKinesisConsumerConfig(String systemName, String sspGrouperFactory,
+      String broadcastStreamConfigValue, String bootstrapStreamName) {
+    Map<String, String> props = buildSamzaKinesisSystemConfig(systemName, sspGrouperFactory, broadcastStreamConfigValue,
+        bootstrapStreamName);
+    return new MapConfig(props);
+  }
+
+  private static Map<String, String> buildSamzaKinesisSystemConfig(String systemName, String sspGrouperFactory,
+      String broadcastStreamConfigValue, String bootstrapStreamName) {
+    Map<String, String> result = new HashMap<>();
+    result.put(String.format(SYSTEM_FACTORY_REGEX, systemName), KINESIS_SYSTEM_FACTORY);
+    result.put("job.systemstreampartition.grouper.factory", sspGrouperFactory);
+    if (broadcastStreamConfigValue != null && !broadcastStreamConfigValue.isEmpty()) {
+      result.put("task.broadcast.inputs", broadcastStreamConfigValue);
+    }
+    if (bootstrapStreamName != null && !bootstrapStreamName.isEmpty()) {
+      result.put("systems." + systemName + ".streams." + bootstrapStreamName + ".samza.bootstrap", "true");
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
new file mode 100644
index 0000000..6379fcc
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+import com.amazonaws.services.kinesis.model.Record;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestKinesisRecordProcessor {
+  private static final long MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS =
+      KinesisRecordProcessor.POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS + 1000;
+
+  @Test
+  public void testLifeCycleWithEvents() throws InterruptedException, ShutdownException, InvalidStateException,
+                                               NoSuchFieldException, IllegalAccessException {
+    testLifeCycleHelper(5);
+  }
+
+  @Test
+  public void testLifeCycleWithNoEvents() throws InterruptedException, ShutdownException, InvalidStateException,
+                                                 NoSuchFieldException, IllegalAccessException {
+    testLifeCycleHelper(0);
+  }
+
+  private void testLifeCycleHelper(int numRecords) throws InterruptedException, ShutdownException,
+                                                          InvalidStateException, NoSuchFieldException,
+                                                          IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
+    final CountDownLatch receivedRecordsLatch = new CountDownLatch(numRecords > 0 ? 1 : 0);
+
+    KinesisRecordProcessorListener listener = new KinesisRecordProcessorListener() {
+      @Override
+      public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) {
+        receivedRecordsLatch.countDown();
+      }
+
+      @Override
+      public void onShutdown(SystemStreamPartition ssp) {
+        receivedShutdownLatch.countDown();
+      }
+    };
+
+    KinesisRecordProcessor processor =
+        new KinesisRecordProcessor(new SystemStreamPartition(system, stream, new Partition(0)), listener);
+
+    // Initialize the processor
+    ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+    InitializationInput initializationInput =
+        new InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum);
+    processor.initialize(initializationInput);
+
+    // Call processRecords on the processor
+    List<Record> records = generateRecords(numRecords, Collections.singletonList(processor)).get(processor);
+
+    // Verification steps
+
+    // Verify there is a receivedRecords call to listener.
+    Assert.assertTrue("Unable to receive records.", receivedRecordsLatch.getCount() == 0);
+
+    if (numRecords > 0) {
+      // Call checkpoint on last record
+      processor.checkpoint(records.get(records.size() - 1).getSequenceNumber());
+    }
+
+
+    // Call shutdown (with ZOMBIE reason) on processor and verify that the processor calls shutdown on the listener.
+    shutDownProcessor(processor, ShutdownReason.ZOMBIE);
+
+    // Verify that the processor is shutdown.
+    Assert.assertTrue("Unable to shutdown processor.", receivedShutdownLatch.getCount() == 0);
+  }
+
+  /**
+   * Test the scenario where a processor instance is created for a shard and while it is processing records, it got
+   * re-assigned to the same consumer. This results in a new processor instance owning the shard and this instance
+   * could receive checkpoint calls for the records that are processed by the old processor instance. This test covers
+   * the scenario where the new instance receives the checkpoint call while it is done with the initialization phase and
+   * before it processed any records.
+   */
+  @Test
+  public void testCheckpointAfterInit() throws InterruptedException, ShutdownException, InvalidStateException,
+                                               NoSuchFieldException, IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
+
+    KinesisRecordProcessorListener listener = new KinesisRecordProcessorListener() {
+      @Override
+      public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) {
+      }
+
+      @Override
+      public void onShutdown(SystemStreamPartition ssp) {
+        receivedShutdownLatch.countDown();
+      }
+    };
+
+    KinesisRecordProcessor processor =
+        new KinesisRecordProcessor(new SystemStreamPartition(system, stream, new Partition(0)), listener);
+
+    // Initialize the processor
+    ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+    InitializationInput initializationInput =
+        new InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum);
+    processor.initialize(initializationInput);
+
+    // Call checkpoint. This checkpoint could have originally headed to the processor instance for the same shard but
+    // due to reassignment a new processor instance is created.
+    processor.checkpoint("1234567");
+
+
+    // Call shutdown (with ZOMBIE reason) on processor and verify that the processor calls shutdown on the listener.
+    shutDownProcessor(processor, ShutdownReason.ZOMBIE);
+
+    // Verify that the processor is shutdown.
+    Assert.assertTrue("Unable to shutdown processor.", receivedShutdownLatch.getCount() == 0);
+  }
+
+  @Test
+  public void testShutdownDuringReshardWithEvents() throws InterruptedException, ShutdownException,
+                                                           InvalidStateException, NoSuchFieldException,
+                                                           IllegalAccessException {
+    testShutdownDuringReshardHelper(5);
+  }
+
+  @Test
+  public void testShutdownDuringReshardWithNoEvents() throws InterruptedException, ShutdownException,
+                                                             InvalidStateException, NoSuchFieldException,
+                                                             IllegalAccessException {
+    testShutdownDuringReshardHelper(0);
+  }
+
+  private void testShutdownDuringReshardHelper(int numRecords)
+      throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException,
+             IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
+    final CountDownLatch receivedRecordsLatch = new CountDownLatch(numRecords > 0 ? 1 : 0);
+
+    KinesisRecordProcessorListener listener = new KinesisRecordProcessorListener() {
+      @Override
+      public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) {
+        receivedRecordsLatch.countDown();
+      }
+
+      @Override
+      public void onShutdown(SystemStreamPartition ssp) {
+        receivedShutdownLatch.countDown();
+      }
+    };
+
+    KinesisRecordProcessor processor =
+        new KinesisRecordProcessor(new SystemStreamPartition(system, stream, new Partition(0)), listener);
+
+    // Initialize the processor
+    ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+    InitializationInput initializationInput =
+        new InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum);
+    processor.initialize(initializationInput);
+
+    // Call processRecords on the processor
+    List<Record> records = generateRecords(numRecords, Collections.singletonList(processor)).get(processor);
+
+    // Verification steps
+
+    // Verify there is a receivedRecords call to listener.
+    Assert.assertTrue("Unable to receive records.", receivedRecordsLatch.getCount() == 0);
+
+    // Call shutdown (with TERMINATE reason) on processor and verify that the processor does not call shutdown on the
+    // listener until checkpoint is called for the last record consumed from shard.
+    new Thread(() -> shutDownProcessor(processor, ShutdownReason.TERMINATE)).start();
+
+    // If there are no records, the processor should shutdown immediately.
+    if (numRecords == 0) {
+      Assert.assertTrue("Unable to shutdown processor.",
+          receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS));
+      return;
+    }
+
+    Assert.assertFalse("Processor shutdown too early.",
+        receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS));
+
+    // Call checkpoint for the last but one record and the processor should still not call shutdown on listener.
+    processor.checkpoint(records.get(records.size() - 2).getSequenceNumber());
+    Assert.assertFalse("Processor shutdown too early.",
+        receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS));
+
+    // Call checkpoint for the last record and the parent partition should be removed from mapper.
+    processor.checkpoint(records.get(records.size() - 1).getSequenceNumber());
+    Assert.assertTrue("Unable to shutdown processor.",
+        receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS));
+  }
+
+  static Map<KinesisRecordProcessor, List<Record>> generateRecords(int numRecordsPerShard,
+      List<KinesisRecordProcessor> processors) throws ShutdownException, InvalidStateException {
+    Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>();
+    processors.forEach(processor -> {
+        try {
+          // Create records and call process records
+          IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
+          doNothing().when(checkpointer).checkpoint(anyString());
+          doNothing().when(checkpointer).checkpoint();
+          ProcessRecordsInput processRecordsInput = Mockito.mock(ProcessRecordsInput.class);
+          when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer);
+          when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L);
+          List<Record> inputRecords = createRecords(numRecordsPerShard);
+          processorRecordMap.put(processor, inputRecords);
+          when(processRecordsInput.getRecords()).thenReturn(inputRecords);
+          processor.processRecords(processRecordsInput);
+        } catch (ShutdownException | InvalidStateException ex) {
+          throw new RuntimeException(ex);
+        }
+      });
+    return processorRecordMap;
+  }
+
+  static void shutDownProcessor(KinesisRecordProcessor processor, ShutdownReason reason) {
+    try {
+      ShutdownInput shutdownInput = Mockito.mock(ShutdownInput.class);
+      when(shutdownInput.getShutdownReason()).thenReturn(reason);
+      when(shutdownInput.getCheckpointer()).thenReturn(getCheckpointer(processor));
+      processor.shutdown(shutdownInput);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  static IRecordProcessorCheckpointer getCheckpointer(KinesisRecordProcessor processor)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field f = processor.getClass().getDeclaredField("checkpointer");
+    f.setAccessible(true);
+    return (IRecordProcessorCheckpointer) f.get(processor);
+  }
+
+  private static List<Record> createRecords(int numRecords) {
+    List<Record> records = new ArrayList<>(numRecords);
+    Random rand = new Random();
+
+    for (int i = 0; i < numRecords; i++) {
+      String dataStr = "testData-" + System.currentTimeMillis();
+      ByteBuffer data = ByteBuffer.wrap(dataStr.getBytes(StandardCharsets.UTF_8));
+      String key = String.format("partitionKey-%d", rand.nextLong());
+      String seqNum = String.format("%04d", 5 * i + 1);
+      Record record = new Record()
+          .withData(data)
+          .withPartitionKey(key)
+          .withSequenceNumber(seqNum)
+          .withApproximateArrivalTimestamp(new Date());
+      records.add(record);
+    }
+    return records;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
new file mode 100644
index 0000000..ade02ac
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.model.Record;
+
+import org.apache.samza.system.kinesis.KinesisConfig;
+import org.apache.samza.system.kinesis.metrics.KinesisSystemConsumerMetrics;
+
+import static org.apache.samza.system.kinesis.consumer.TestKinesisRecordProcessor.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * These class of tests test KinesisSystemConsumer and KinesisRecordProcessor together.
+ */
+public class TestKinesisSystemConsumer {
+  private static final String SYSTEM_CONSUMER_REGISTER_OFFSET = "0000"; // Could be any string
+
+  @Test
+  public void testProcessRecords() throws InterruptedException, ShutdownException, InvalidStateException,
+                                          NoSuchFieldException, IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    int numShards = 2;
+    int numRecordsPerShard = 5;
+
+    testProcessRecordsHelper(system, stream, numShards, numRecordsPerShard);
+  }
+
+  @Test
+  public void testProcessRecordsWithEmptyRecordList() throws InterruptedException, ShutdownException,
+                                                             InvalidStateException, NoSuchFieldException,
+                                                             IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    int numShards = 1;
+    int numRecordsPerShard = 0;
+
+    testProcessRecordsHelper(system, stream, numShards, numRecordsPerShard);
+  }
+
+  /**
+   * Helper to simulate and test the life-cycle of record processing from a kinesis stream with a given number of shards
+   * 1. Creation of record processors.
+   * 2. Initialization of record processors.
+   * 3. Processing records via record processors.
+   * 4. Calling checkpoint on record processors.
+   * 5. Shutting down (due to re-assignment or lease expiration) record processors.
+   */
+  private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard)
+      throws InterruptedException, ShutdownException, InvalidStateException,
+             NoSuchFieldException, IllegalAccessException {
+
+    KinesisConfig kConfig = new KinesisConfig(new MapConfig());
+    // Create consumer
+    KinesisSystemConsumer consumer = new KinesisSystemConsumer(system, kConfig, new NoOpMetricsRegistry());
+    initializeMetrics(consumer, stream);
+
+    List<SystemStreamPartition> ssps = new LinkedList<>();
+    IntStream.range(0, numShards)
+        .forEach(p -> {
+            SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p));
+            ssps.add(ssp);
+          });
+    ssps.forEach(ssp -> consumer.register(ssp, SYSTEM_CONSUMER_REGISTER_OFFSET));
+
+    // Create Kinesis record processor factory
+    IRecordProcessorFactory factory = consumer.createRecordProcessorFactory(stream);
+
+    // Create and initialize Kinesis record processor
+    Map<String, KinesisRecordProcessor> processorMap = createAndInitProcessors(factory, numShards);
+    List<KinesisRecordProcessor> processorList = new ArrayList<>(processorMap.values());
+
+    // Generate records to Kinesis record processor
+    Map<KinesisRecordProcessor, List<Record>> inputRecordMap = generateRecords(numRecordsPerShard, processorList);
+
+    // Verification steps
+
+    // Read events from the BEM queue
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages =
+        readEvents(new HashSet<>(ssps), consumer, numRecordsPerShard);
+    if (numRecordsPerShard > 0) {
+      Assert.assertEquals(messages.size(), numShards);
+    } else {
+      // No input records and hence no messages
+      Assert.assertEquals(messages.size(), 0);
+      return;
+    }
+
+    Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = getProcessorMap(consumer);
+    ssps.forEach(ssp -> {
+        try {
+          KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);
+
+          if (numRecordsPerShard > 0) {
+            // Verify that the read messages are received in order and are the same as input records
+            Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
+            List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
+            List<Record> inputRecords = inputRecordMap.get(processor);
+            verifyRecords(envelopes, inputRecords, processor.getShardId());
+
+            // Call checkpoint on consumer and verify that the checkpoint is called with the right offset
+            IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
+            consumer.onCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
+            ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+            verify(getCheckpointer(processor)).checkpoint(argument.capture());
+            Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
+          }
+
+          // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
+          shutDownProcessor(processor, ShutdownReason.ZOMBIE);
+          Assert.assertTrue(!sspToProcessorMap.containsValue(processor));
+          Assert.assertTrue(isSspAvailable(consumer, ssp));
+        } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
+          throw new RuntimeException(ex);
+        }
+      });
+  }
+
+  private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) {
+    Map<String, KinesisRecordProcessor> processorMap = new HashMap<>();
+    IntStream.range(0, numShards)
+        .forEach(p -> {
+            String shardId = String.format("shard-%05d", p);
+            // Create Kinesis processor
+            KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();
+
+            // Initialize the shard
+            ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+            InitializationInput initializationInput =
+                new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
+            processor.initialize(initializationInput);
+            processorMap.put(shardId, processor);
+          });
+    return processorMap;
+  }
+
+  private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> readEvents(Set<SystemStreamPartition> ssps,
+      KinesisSystemConsumer consumer, int numEvents) throws InterruptedException {
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<>();
+    int totalEventsConsumed = 0;
+
+    while (totalEventsConsumed < numEvents) {
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> receivedMessages =
+          consumer.poll(ssps, Duration.ofSeconds(1).toMillis());
+      receivedMessages.forEach((key, value) -> {
+          if (messages.containsKey(key)) {
+            messages.get(key).addAll(value);
+          } else {
+            messages.put(key, new ArrayList<>(value));
+          }
+        });
+      totalEventsConsumed = messages.values().stream().mapToInt(List::size).sum();
+    }
+
+    if (totalEventsConsumed < numEvents) {
+      String msg = String.format("Received only %d of %d events", totalEventsConsumed, numEvents);
+      throw new SamzaException(msg);
+    }
+    return messages;
+  }
+
+  private void verifyRecords(List<IncomingMessageEnvelope> outputRecords, List<Record> inputRecords, String shardId) {
+    Iterator outputRecordsIter = outputRecords.iterator();
+    inputRecords.forEach(record -> {
+        IncomingMessageEnvelope envelope = (IncomingMessageEnvelope) outputRecordsIter.next();
+        String outputKey = (String) envelope.getKey();
+        KinesisIncomingMessageEnvelope kinesisMessageEnvelope = (KinesisIncomingMessageEnvelope) envelope;
+        Assert.assertEquals(outputKey, record.getPartitionKey());
+        Assert.assertEquals(kinesisMessageEnvelope.getSequenceNumber(), record.getSequenceNumber());
+        Assert.assertEquals(kinesisMessageEnvelope.getApproximateArrivalTimestamp(),
+            record.getApproximateArrivalTimestamp());
+        Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId);
+        ByteBuffer outputData = ByteBuffer.wrap((byte[]) kinesisMessageEnvelope.getMessage());
+        record.getData().rewind();
+        Assert.assertTrue(outputData.equals(record.getData()));
+        verifyOffset(envelope.getOffset(), record, shardId);
+      });
+  }
+
+  private void verifyOffset(String offset, Record inputRecord, String shardId) {
+    KinesisSystemConsumerOffset ckpt = KinesisSystemConsumerOffset.parse(offset);
+    Assert.assertEquals(ckpt.getSeqNumber(), inputRecord.getSequenceNumber());
+    Assert.assertEquals(ckpt.getShardId(), shardId);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void initializeMetrics(KinesisSystemConsumer consumer, String stream)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field f = consumer.getClass().getDeclaredField("metrics");
+    f.setAccessible(true);
+    KinesisSystemConsumerMetrics metrics = (KinesisSystemConsumerMetrics) f.get(consumer);
+    metrics.initializeMetrics(Collections.singleton(stream));
+  }
+
+  @SuppressWarnings("unchecked")
+  private Map<SystemStreamPartition, KinesisRecordProcessor> getProcessorMap(KinesisSystemConsumer consumer)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field f = consumer.getClass().getDeclaredField("processors");
+    f.setAccessible(true);
+    return (Map<SystemStreamPartition, KinesisRecordProcessor>) f.get(consumer);
+  }
+
+  @SuppressWarnings("unchecked")
+  private boolean isSspAvailable(KinesisSystemConsumer consumer, SystemStreamPartition ssp)
+      throws NoSuchFieldException, IllegalAccessException {
+    SSPAllocator sspAllocator = getSspAllocator(consumer);
+    Field f = sspAllocator.getClass().getDeclaredField("availableSsps");
+    f.setAccessible(true);
+    Map<String, Set<SystemStreamPartition>> availableSsps = (Map<String, Set<SystemStreamPartition>>) f.get(
+        sspAllocator);
+    return availableSsps.containsKey(ssp.getStream()) && availableSsps.get(ssp.getStream()).contains(ssp);
+  }
+
+  private SSPAllocator getSspAllocator(KinesisSystemConsumer consumer)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field f = consumer.getClass().getDeclaredField("sspAllocator");
+    f.setAccessible(true);
+    return (SSPAllocator) f.get(consumer);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java
new file mode 100644
index 0000000..615a06e
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestKinesisSystemConsumerOffset {
+  @Test
+  public void testEquality() {
+    KinesisSystemConsumerOffset inCkpt = new KinesisSystemConsumerOffset("shard-00000", "123456");
+    KinesisSystemConsumerOffset outCkpt = KinesisSystemConsumerOffset.parse(inCkpt.toString());
+    Assert.assertEquals(inCkpt, outCkpt);
+  }
+
+  @Test
+  public void testInEquality() {
+    KinesisSystemConsumerOffset inCkpt = new KinesisSystemConsumerOffset("shard-00000", "123456");
+
+    // With different shardId
+    KinesisSystemConsumerOffset inCkpt1 = new KinesisSystemConsumerOffset("shard-00001", "123456");
+    KinesisSystemConsumerOffset outCkpt = KinesisSystemConsumerOffset.parse(inCkpt1.toString());
+    Assert.assertTrue(!inCkpt.equals(outCkpt));
+
+    // With different seqNumber
+    inCkpt1 = new KinesisSystemConsumerOffset("shard-00000", "123457");
+    outCkpt = KinesisSystemConsumerOffset.parse(inCkpt1.toString());
+    Assert.assertTrue(!inCkpt.equals(outCkpt));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java
new file mode 100644
index 0000000..0533a29
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSSPAllocator {
+  @Test
+  public void testAllocateAndFree() throws NoAvailablePartitionException, NoSuchFieldException, IllegalAccessException {
+    int numPartitions = 2;
+    String system = "kinesis";
+    String stream = "stream";
+    List<SystemStreamPartition> ssps = new ArrayList<>();
+    IntStream.range(0, numPartitions)
+        .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i))));
+
+    SSPAllocator allocator = new SSPAllocator();
+    ssps.forEach(allocator::free);
+
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+
+    SystemStreamPartition ssp = allocator.allocate(stream);
+    Assert.assertFalse(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+    Assert.assertEquals(ssp, ssps.get(0));
+
+    ssp = allocator.allocate(stream);
+    Assert.assertFalse(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertFalse(isSspAvailable(allocator, ssps.get(1)));
+    Assert.assertEquals(ssp, ssps.get(1));
+
+    allocator.free(ssps.get(1));
+    Assert.assertFalse(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+
+    allocator.free(ssps.get(0));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+  }
+
+  @Test (expected = NoAvailablePartitionException.class)
+  public void testAssignMoreThanMaxPartitions() throws NoAvailablePartitionException {
+    int numPartitions = 2;
+    String system = "kinesis";
+    String stream = "stream";
+    List<SystemStreamPartition> ssps = new ArrayList<>();
+    IntStream.range(0, numPartitions)
+        .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i))));
+
+    SSPAllocator allocator = new SSPAllocator();
+    ssps.forEach(allocator::free);
+
+    allocator.allocate(stream);
+    allocator.allocate(stream);
+    allocator.allocate(stream); // An exception should be thrown at this point.
+  }
+
+  @Test (expected = IllegalArgumentException.class)
+  public void testFreeSameSspTwice() throws NoAvailablePartitionException {
+    int numPartitions = 2;
+    String system = "kinesis";
+    String stream = "stream";
+    List<SystemStreamPartition> ssps = new ArrayList<>();
+    IntStream.range(0, numPartitions)
+        .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i))));
+
+    SSPAllocator allocator = new SSPAllocator();
+    ssps.forEach(allocator::free);
+
+    SystemStreamPartition ssp = allocator.allocate(stream);
+    allocator.free(ssp);
+    allocator.free(ssp); // An exception should be thrown at this point.
+  }
+
+  @Test (expected = IllegalArgumentException.class)
+  public void testFreeUnallocatedSsp() throws NoAvailablePartitionException {
+    int numPartitions = 2;
+    String system = "kinesis";
+    String stream = "stream";
+    List<SystemStreamPartition> ssps = new ArrayList<>();
+    IntStream.range(0, numPartitions)
+        .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i))));
+
+    SSPAllocator allocator = new SSPAllocator();
+    ssps.forEach(allocator::free);
+
+    allocator.allocate(stream);
+    allocator.free(ssps.get(1)); // An exception should be thrown at this point.
+  }
+
+  @SuppressWarnings("unchecked")
+  private boolean isSspAvailable(SSPAllocator sspAllocator, SystemStreamPartition ssp) throws NoSuchFieldException, IllegalAccessException {
+    Field f = sspAllocator.getClass().getDeclaredField("availableSsps");
+    f.setAccessible(true);
+    Map<String, Set<SystemStreamPartition>> availableSsps = (Map<String, Set<SystemStreamPartition>>) f.get(
+        sspAllocator);
+    return availableSsps.containsKey(ssp.getStream()) && availableSsps.get(ssp.getStream()).contains(ssp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 0fe3dfa..e50e816 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -24,9 +24,8 @@ include \
   'samza-rest',
   'samza-shell',
   'samza-azure',
-  'samza-sql'
-
-
+  'samza-sql',
+  'samza-aws'
 
 def scalaModules = [
         'samza-core',


[4/8] samza git commit: SAMZA-1515; Implement a consumer for Kinesis

Posted by xi...@apache.org.
SAMZA-1515; Implement a consumer for Kinesis

Author: Aditya Toomula <at...@atoomula-ld1.linkedin.biz>

Reviewers: Jagadish<ja...@apache.org>

Closes #368 from atoomula/kinesis


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9961023f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9961023f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9961023f

Branch: refs/heads/0.14.0
Commit: 9961023f7bf7c4b19804fb4e50a14c86d6fc9233
Parents: 5e68d62
Author: Aditya Toomula <at...@atoomula-ld1.linkedin.biz>
Authored: Tue Nov 28 13:12:10 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 28 13:12:10 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |  33 ++
 .../kinesis/KinesisAWSCredentialsProvider.java  |  69 +++++
 .../samza/system/kinesis/KinesisConfig.java     | 287 ++++++++++++++++++
 .../system/kinesis/KinesisSystemAdmin.java      | 124 ++++++++
 .../system/kinesis/KinesisSystemFactory.java    |  87 ++++++
 .../KinesisIncomingMessageEnvelope.java         |  62 ++++
 .../consumer/KinesisRecordProcessor.java        | 208 +++++++++++++
 .../KinesisRecordProcessorListener.java         |  51 ++++
 .../kinesis/consumer/KinesisSystemConsumer.java | 256 ++++++++++++++++
 .../consumer/KinesisSystemConsumerOffset.java   | 107 +++++++
 .../consumer/NoAvailablePartitionException.java |  38 +++
 .../system/kinesis/consumer/SSPAllocator.java   |  73 +++++
 .../metrics/KinesisSystemConsumerMetrics.java   | 106 +++++++
 .../system/kinesis/metrics/SamzaHistogram.java  |  63 ++++
 .../TestKinesisAWSCredentialsProvider.java      |  60 ++++
 .../samza/system/kinesis/TestKinesisConfig.java | 132 ++++++++
 .../kinesis/TestKinesisSystemFactory.java       | 115 +++++++
 .../consumer/TestKinesisRecordProcessor.java    | 301 +++++++++++++++++++
 .../consumer/TestKinesisSystemConsumer.java     | 270 +++++++++++++++++
 .../TestKinesisSystemConsumerOffset.java        |  48 +++
 .../kinesis/consumer/TestSSPAllocator.java      | 127 ++++++++
 settings.gradle                                 |   5 +-
 22 files changed, 2619 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 59ff5f2..eddb11c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -220,6 +220,39 @@ project(':samza-azure') {
   }
 }
 
+project(':samza-aws') {
+  apply plugin: 'java'
+  apply plugin: 'checkstyle'
+
+  dependencies {
+    compile "com.amazonaws:aws-java-sdk-kinesis:1.11.152"
+    compile "com.amazonaws:amazon-kinesis-client:1.7.5"
+    compile "com.amazonaws:amazon-kinesis-producer:0.10.0"
+    compile "io.dropwizard.metrics:metrics-core:3.1.2"
+    compile "org.codehaus.jackson:jackson-core-asl:1.9.7"
+    compile "org.codehaus.jackson:jackson-mapper-asl:1.9.7"
+    compile project(':samza-api')
+    compile project(":samza-core_$scalaVersion")
+    compile "org.slf4j:slf4j-api:$slf4jVersion"
+    runtime "org.apache.httpcomponents:httpclient:4.5.2"
+    runtime "org.apache.httpcomponents:httpcore:4.4.5"
+    testCompile "junit:junit:$junitVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
+  }
+
+  repositories {
+    maven {
+      url "https://repo1.maven.org/maven2/"
+    }
+  }
+
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+    toolVersion = "$checkstyleVersion"
+  }
+}
+
+
 project(":samza-autoscaling_$scalaVersion") {
   apply plugin: 'scala'
   apply plugin: 'checkstyle'

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java
new file mode 100644
index 0000000..a37cfb4
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+
+
+/**
+ * AWSCredentialsProvider implementation that takes in accessKey and secretKey directly. Requires both accessKey and
+ * secretKey to be non-null for it to create a BasicAWSCredentials instance. Otherwise, it creates an AWSCredentials
+ * instance with null keys.
+ */
+public class KinesisAWSCredentialsProvider implements AWSCredentialsProvider {
+  private final AWSCredentials creds;
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisAWSCredentialsProvider.class.getName());
+
+  public KinesisAWSCredentialsProvider(String accessKey, String secretKey) {
+    if (StringUtils.isEmpty(accessKey) || StringUtils.isEmpty(secretKey)) {
+      creds = new AWSCredentials() {
+        @Override
+        public String getAWSAccessKeyId() {
+          return null;
+        }
+
+        @Override
+        public String getAWSSecretKey() {
+          return null;
+        }
+      };
+      LOG.info("Could not load credentials from KinesisAWSCredentialsProvider");
+    } else {
+      creds = new BasicAWSCredentials(accessKey, secretKey);
+      LOG.info("Loaded credentials from KinesisAWSCredentialsProvider");
+    }
+  }
+
+  @Override
+  public AWSCredentials getCredentials() {
+    return creds;
+  }
+
+  @Override
+  public void refresh() {
+    //no-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
new file mode 100644
index 0000000..a4ac40d
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSCredentialsProviderChain;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+import com.amazonaws.ClientConfiguration;
+
+
+/**
+ * Configs for Kinesis system. It contains three sets of configs:
+ * <ol>
+ *   <li> Configs required by Samza Kinesis Consumer.
+ *   <li> Configs that are AWS client specific provided at system scope {@link ClientConfiguration}
+ *   <li> Configs that are KCL specific (could be provided either at system scope or stream scope)
+ *        {@link KinesisClientLibConfiguration}
+ * </ol>
+ */
+public class KinesisConfig extends MapConfig {
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisConfig.class.getName());
+
+  private static final String CONFIG_SYSTEM_REGION = "systems.%s.aws.region";
+  private static final String CONFIG_STREAM_REGION = "systems.%s.streams.%s.aws.region";
+
+  private static final String CONFIG_STREAM_ACCESS_KEY = "systems.%s.streams.%s.aws.accessKey";
+  private static final String CONFIG_STREAM_SECRET_KEY = "sensitive.systems.%s.streams.%s.aws.secretKey";
+
+  private static final String CONFIG_AWS_CLIENT_CONFIG = "systems.%s.aws.clientConfig.";
+  private static final String CONFIG_PROXY_HOST = CONFIG_AWS_CLIENT_CONFIG + "ProxyHost";
+  private static final String DEFAULT_CONFIG_PROXY_HOST = "";
+  private static final String CONFIG_PROXY_PORT = CONFIG_AWS_CLIENT_CONFIG + "ProxyPort";
+  private static final int DEFAULT_CONFIG_PROXY_PORT = 0;
+
+  private static final String CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.aws.kcl.";
+  private static final String CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.streams.%s.aws.kcl.";
+
+  public KinesisConfig(Config config) {
+    super(config);
+  }
+
+  /**
+   * Return a set of streams from the config for a given system.
+   * @param system name of the system
+   * @return a set of streams
+   */
+  public Set<String> getKinesisStreams(String system) {
+    // build stream-level configs
+    Config streamsConfig = subset(String.format("systems.%s.streams.", system), true);
+    // all properties should now start with stream name
+    Set<String> streams = new HashSet<>();
+    streamsConfig.keySet().forEach(key -> {
+        String[] parts = key.split("\\.", 2);
+        if (parts.length != 2) {
+          throw new IllegalArgumentException("Ill-formatted stream config: " + key);
+        }
+        streams.add(parts[0]);
+      });
+    return streams;
+  }
+
+  /**
+   * Get KCL config for a given system stream.
+   * @param system name of the system
+   * @param stream name of the stream
+   * @param appName name of the application
+   * @return Stream scoped KCL configs required to build
+   *         {@link KinesisClientLibConfiguration}
+   */
+  public KinesisClientLibConfiguration getKinesisClientLibConfig(String system, String stream, String appName) {
+    ClientConfiguration clientConfig = getAWSClientConfig(system);
+    String workerId = appName + "-" + UUID.randomUUID();
+    InitialPositionInStream startPos = InitialPositionInStream.LATEST;
+    AWSCredentialsProvider provider = credentialsProviderForStream(system, stream);
+    KinesisClientLibConfiguration kinesisClientLibConfiguration =
+        new KinesisClientLibConfiguration(appName, stream, provider, workerId)
+            .withRegionName(getRegion(system, stream).getName())
+            .withKinesisClientConfig(clientConfig)
+            .withCloudWatchClientConfig(clientConfig)
+            .withDynamoDBClientConfig(clientConfig)
+            .withInitialPositionInStream(startPos)
+            .withCallProcessRecordsEvenForEmptyRecordList(true); // For health monitoring metrics.
+    // First, get system scoped configs for KCL and override with configs set at stream scope.
+    setKinesisClientLibConfigs(
+        subset(String.format(CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, system)), kinesisClientLibConfiguration);
+    setKinesisClientLibConfigs(subset(String.format(CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, system, stream)),
+        kinesisClientLibConfiguration);
+    return kinesisClientLibConfiguration;
+  }
+
+  /**
+   * Get the Kinesis secret key for the system stream
+   * @param system name of the system
+   * @param stream name of the stream
+   * @return Kinesis secret key
+   */
+  protected String getStreamSecretKey(String system, String stream) {
+    return get(String.format(CONFIG_STREAM_SECRET_KEY, system, stream));
+  }
+
+  /**
+   * Get SSL socket factory for the proxy for a given system
+   * @param system name of the system
+   * @return ConnectionSocketFactory
+   */
+  protected ConnectionSocketFactory getSSLSocketFactory(String system) {
+    return null;
+  }
+
+  /**
+   * @param system name of the system
+   * @return {@link ClientConfiguration} which has options controlling how the client connects to kinesis
+   *         (eg: proxy settings, retry counts, etc)
+   */
+  ClientConfiguration getAWSClientConfig(String system) {
+    ClientConfiguration awsClientConfig = new ClientConfiguration();
+    setAwsClientConfigs(subset(String.format(CONFIG_AWS_CLIENT_CONFIG, system)), awsClientConfig);
+    awsClientConfig.getApacheHttpClientConfig().setSslSocketFactory(getSSLSocketFactory(system));
+    return awsClientConfig;
+  }
+
+  /**
+   * Get the proxy host as a system level config. This is needed when
+   * users need to go through a proxy for the Kinesis connections.
+   * @param system name of the system
+   * @return proxy host name or empty string if not defined
+   */
+  String getProxyHost(String system) {
+    return get(String.format(CONFIG_PROXY_HOST, system), DEFAULT_CONFIG_PROXY_HOST);
+  }
+
+  /**
+   * Get the proxy port number as a system level config. This is needed when
+   * users need to go through a proxy for the Kinesis connections.
+   * @param system name of the system
+   * @return proxy port number or 0 if not defined
+   */
+  int getProxyPort(String system) {
+    return getInt(String.format(CONFIG_PROXY_PORT, system), DEFAULT_CONFIG_PROXY_PORT);
+  }
+
+  /**
+   * Get the Kinesis region for the system stream
+   * @param system name of the system
+   * @param stream name of the stream
+   * @return Kinesis region
+   */
+  Region getRegion(String system, String stream) {
+    String name = get(String.format(CONFIG_STREAM_REGION, system, stream),
+        get(String.format(CONFIG_SYSTEM_REGION, system)));
+    return Region.getRegion(Regions.fromName(name));
+  }
+
+  /**
+   * Get the Kinesis access key name for the system stream
+   * @param system name of the system
+   * @param stream name of the stream
+   * @return Kinesis access key
+   */
+  String getStreamAccessKey(String system, String stream) {
+    return get(String.format(CONFIG_STREAM_ACCESS_KEY, system, stream));
+  }
+
+  /**
+   * Get the appropriate CredentialProvider for a given system stream.
+   * @param system name of the system
+   * @param stream name of the stream
+   * @return AWSCredentialsProvider
+   */
+  AWSCredentialsProvider credentialsProviderForStream(String system, String stream) {
+    // Try to load credentials in the following order:
+    // 1. Access key from the config and passed in secretKey
+    // 2. From the default credential provider chain (environment variables, system properties, AWS profile file, etc)
+    return new AWSCredentialsProviderChain(
+        new KinesisAWSCredentialsProvider(getStreamAccessKey(system, stream), getStreamSecretKey(system, stream)),
+        new DefaultAWSCredentialsProviderChain());
+  }
+
+  private void setAwsClientConfigs(Config config, ClientConfiguration clientConfig) {
+    for (Entry<String, String> entry : config.entrySet()) {
+      boolean found = false;
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (StringUtils.isEmpty(value)) {
+        continue;
+      }
+      for (Method method : ClientConfiguration.class.getMethods()) {
+        // For each property invoke the corresponding setter, if it exists
+        if (method.getName().equals("set" + key)) {
+          found = true;
+          Class<?> type = method.getParameterTypes()[0];
+          try {
+            if (type == long.class) {
+              method.invoke(clientConfig, Long.valueOf(value));
+            } else if (type == int.class) {
+              method.invoke(clientConfig, Integer.valueOf(value));
+            } else if (type == boolean.class) {
+              method.invoke(clientConfig, Boolean.valueOf(value));
+            } else if (type == String.class) {
+              method.invoke(clientConfig, value);
+            }
+            LOG.info("Loaded property " + key + " = " + value);
+            break;
+          } catch (Exception e) {
+            throw new IllegalArgumentException(
+                String.format("Error trying to set field %s with the value '%s'", key, value), e);
+          }
+        }
+      }
+      if (!found) {
+        LOG.warn("Property " + key + " ignored as there is no corresponding set method");
+      }
+    }
+  }
+
+  private void setKinesisClientLibConfigs(Map<String, String> config, KinesisClientLibConfiguration kinesisLibConfig) {
+    for (Entry<String, String> entry : config.entrySet()) {
+      boolean found = false;
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (StringUtils.isEmpty(value)) {
+        continue;
+      }
+      for (Method method : KinesisClientLibConfiguration.class.getMethods()) {
+        if (method.getName().equals("with" + key)) {
+          found = true;
+          Class<?> type = method.getParameterTypes()[0];
+          try {
+            if (type == long.class) {
+              method.invoke(kinesisLibConfig, Long.valueOf(value));
+            } else if (type == int.class) {
+              method.invoke(kinesisLibConfig, Integer.valueOf(value));
+            } else if (type == boolean.class) {
+              method.invoke(kinesisLibConfig, Boolean.valueOf(value));
+            } else if (type == String.class) {
+              method.invoke(kinesisLibConfig, value);
+            } else if (type == InitialPositionInStream.class) {
+              method.invoke(kinesisLibConfig, InitialPositionInStream.valueOf(value.toUpperCase()));
+            }
+            LOG.info("Loaded property " + key + " = " + value);
+            break;
+          } catch (Exception e) {
+            throw new IllegalArgumentException(
+                String.format("Error trying to set field %s with the value '%s'", key, value), e);
+          }
+        }
+      }
+      if (!found) {
+        LOG.warn("Property " + key + " ignored as there is no corresponding set method");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java
new file mode 100644
index 0000000..4843276
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+
+
+/**
+ * A Kinesis-based implementation of SystemAdmin.
+ */
+public class KinesisSystemAdmin implements SystemAdmin {
+
+  private static final SystemStreamMetadata.SystemStreamPartitionMetadata SYSTEM_STREAM_PARTITION_METADATA =
+      new SystemStreamMetadata.SystemStreamPartitionMetadata(ExtendedSequenceNumber.TRIM_HORIZON.getSequenceNumber(),
+          ExtendedSequenceNumber.LATEST.getSequenceNumber(),
+          ExtendedSequenceNumber.LATEST.getSequenceNumber());
+
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisSystemAdmin.class.getName());
+
+  private final String system;
+  private final KinesisConfig kConfig;
+
+  public KinesisSystemAdmin(String system, KinesisConfig kConfig) {
+    this.system = system;
+    this.kConfig = kConfig;
+  }
+
+  /**
+   * Source of truth for checkpointing is always kinesis and the offsets written to samza checkpoint topic are ignored.
+   * Hence, return null for the getOffsetsAfter for a supplied map of ssps.
+   */
+  @Override
+  public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    Map<SystemStreamPartition, String> offsetsAfter = new HashMap<>();
+
+    for (SystemStreamPartition systemStreamPartition : offsets.keySet()) {
+      offsetsAfter.put(systemStreamPartition, null);
+    }
+
+    return offsetsAfter;
+  }
+
+  /**
+   * Source of truth for checkpointing is always kinesis and the offsets given by samza are always ignored by KCL.
+   * Hence, return a placeholder for each ssp.
+   */
+  @Override
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+    return streamNames.stream().collect(Collectors.toMap(Function.identity(), this::createSystemStreamMetadata));
+  }
+
+  private SystemStreamMetadata createSystemStreamMetadata(String stream) {
+    LOG.info("create stream metadata for stream {} based on aws stream", stream);
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadata = new HashMap<>();
+    AmazonKinesisClient client = null;
+
+    try {
+      ClientConfiguration clientConfig = kConfig.getAWSClientConfig(system);
+      AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
+          .withCredentials(kConfig.credentialsProviderForStream(system, stream))
+          .withClientConfiguration(clientConfig);
+      builder.setRegion(kConfig.getRegion(system, stream).getName());
+      client = (AmazonKinesisClient) builder.build();
+      StreamDescription desc = client.describeStream(stream).getStreamDescription();
+      IntStream.range(0, desc.getShards().size())
+          .forEach(i -> metadata.put(new Partition(i), SYSTEM_STREAM_PARTITION_METADATA));
+    } catch (Exception e) {
+      String errMsg = "couldn't load metadata for stream " + stream;
+      LOG.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
+    } finally {
+      if (client != null) {
+        client.shutdown();
+      }
+    }
+
+    return new SystemStreamMetadata(stream, metadata);
+  }
+
+  /**
+   * Checkpoints are written to KCL and is always the source of truth. Format for Samza offsets is different from
+   * that of Kinesis checkpoint. Samza offsets are not comparable. Hence, return null.
+   */
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
new file mode 100644
index 0000000..558e871
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+
+import org.apache.samza.system.kinesis.consumer.KinesisSystemConsumer;
+
+
+/**
+ * A Kinesis-based implementation of SystemFactory.
+ */
+public class KinesisSystemFactory implements SystemFactory {
+  @Override
+  public SystemConsumer getConsumer(String system, Config config, MetricsRegistry registry) {
+    KinesisConfig kConfig = new KinesisConfig(config);
+    return new KinesisSystemConsumer(system, kConfig, registry);
+  }
+
+  @Override
+  public SystemProducer getProducer(String system, Config config, MetricsRegistry registry) {
+    return null;
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String system, Config config) {
+    validateConfig(system, config);
+    KinesisConfig kConfig = new KinesisConfig(config);
+    return new KinesisSystemAdmin(system, kConfig);
+  }
+
+  protected void validateConfig(String system, Config config) {
+    // Kinesis system does not support groupers other than AllSspToSingleTaskGrouper
+    JobConfig jobConfig = new JobConfig(config);
+    if (!jobConfig.getSystemStreamPartitionGrouperFactory().equals(
+        AllSspToSingleTaskGrouperFactory.class.getCanonicalName())) {
+      String errMsg = String.format("Incorrect Grouper %s used for KinesisSystemConsumer %s. Please set the %s config"
+              + " to %s.", jobConfig.getSystemStreamPartitionGrouperFactory(), system,
+          JobConfig.SSP_GROUPER_FACTORY(), AllSspToSingleTaskGrouperFactory.class.getCanonicalName());
+      throw new ConfigException(errMsg);
+    }
+
+    // Kinesis streams cannot be configured as broadcast streams
+    TaskConfigJava taskConfig = new TaskConfigJava(config);
+    if (taskConfig.getBroadcastSystemStreams().stream().anyMatch(ss -> system.equals(ss.getSystem()))) {
+      throw new ConfigException("Kinesis streams cannot be configured as broadcast streams.");
+    }
+
+    // Kinesis streams cannot be configured as bootstrap streams
+    KinesisConfig kConfig = new KinesisConfig(config);
+    kConfig.getKinesisStreams(system).forEach(stream -> {
+        StreamConfig streamConfig = new StreamConfig(kConfig);
+        SystemStream ss = new SystemStream(system, stream);
+        if (streamConfig.getBootstrapEnabled(ss)) {
+          throw new ConfigException("Kinesis streams cannot be configured as bootstrap streams.");
+        }
+      });
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java
new file mode 100644
index 0000000..95e6b6a
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.Date;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Kinesis record with payload and some metadata.
+ */
+public class KinesisIncomingMessageEnvelope extends IncomingMessageEnvelope {
+  private final String shardId;
+  private final String sequenceNumber;
+  private final Date approximateArrivalTimestamp;
+
+  public KinesisIncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key,
+      Object message, String shardId, String sequenceNumber, Date approximateArrivalTimestamp) {
+    super(systemStreamPartition, offset, key, message);
+    this.shardId = shardId;
+    this.sequenceNumber = sequenceNumber;
+    this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+  }
+
+  public String getShardId() {
+    return shardId;
+  }
+
+  public String getSequenceNumber() {
+    return sequenceNumber;
+  }
+
+  public Date getApproximateArrivalTimestamp() {
+    return approximateArrivalTimestamp;
+  }
+
+  @Override
+  public String toString() {
+    return "KinesisIncomingMessageEnvelope:: shardId:" + shardId + ", sequenceNumber:" + sequenceNumber
+        + ", approximateArrivalTimestamp:" + approximateArrivalTimestamp + ", message:" + getMessage();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
new file mode 100644
index 0000000..53ff27f
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.List;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+import com.amazonaws.services.kinesis.model.Record;
+
+
+/**
+ * Record processor for AWS kinesis stream. It does the following:
+ * <ul>
+ *   <li> when a shard is assigned by KCL in initialize API, it asks and gets an ssp from sspAllocator.
+ *   <li> when records are received in processRecords API, it translates them to IncomingMessageEnvelope and enqueues
+ *        the resulting envelope in the appropriate blocking buffer queue.
+ *   <li> when checkpoint API is called by samza, it checkpoints via KCL to Kinesis.
+ *   <li> when shutdown API is called by KCL, based on the terminate reason, it takes necessary action.
+ * </ul>
+ *
+ * initialize, processRecords and shutdown APIs are never called concurrently on a processor instance. However,
+ * checkpoint API could be called by Samza thread while processRecords and shutdown callback APIs are invoked by KCL.
+ * Please note that the APIs for different record processor instances could be called concurrently.
+ */
+
+public class KinesisRecordProcessor implements IRecordProcessor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordProcessor.class.getName());
+  static final long POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS = 1000;
+
+  private final SystemStreamPartition ssp;
+
+  private String shardId;
+  private KinesisRecordProcessorListener listener;
+  private IRecordProcessorCheckpointer checkpointer;
+  private ExtendedSequenceNumber initSeqNumber;
+
+  private volatile ExtendedSequenceNumber lastProcessedRecordSeqNumber;
+  private volatile ExtendedSequenceNumber lastCheckpointedRecordSeqNumber;
+
+  private boolean shutdownRequested = false;
+
+  KinesisRecordProcessor(SystemStreamPartition ssp, KinesisRecordProcessorListener listener) {
+    this.ssp = ssp;
+    this.listener = listener;
+  }
+
+  /**
+   * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
+   * (via processRecords).
+   *
+   * @param initializationInput Provides information related to initialization
+   */
+  @Override
+  public void initialize(InitializationInput initializationInput) {
+    Validate.isTrue(listener != null, "There is no listener set for the processor.");
+    initSeqNumber = initializationInput.getExtendedSequenceNumber();
+    shardId = initializationInput.getShardId();
+    LOG.info("Initialization done for {} with sequence {}", this,
+        initializationInput.getExtendedSequenceNumber().getSequenceNumber());
+  }
+
+  /**
+   * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
+   * application. Upon fail over, the new instance will get records with sequence number greater than the checkpoint
+   * position for each partition key.
+   *
+   * @param processRecordsInput Provides the records to be processed as well as information and capabilities related
+   *        to them (eg checkpointing).
+   */
+  @Override
+  public void processRecords(ProcessRecordsInput processRecordsInput) {
+    // KCL does not send any records to the processor that was shutdown.
+    Validate.isTrue(!shutdownRequested,
+        String.format("KCL returned records after shutdown is called on the processor %s.", this));
+    // KCL aways gives reference to the same checkpointer instance for a given processor instance.
+    checkpointer = processRecordsInput.getCheckpointer();
+    List<Record> records = processRecordsInput.getRecords();
+    // Empty records are expected when KCL config has CallProcessRecordsEvenForEmptyRecordList set to true.
+    if (!records.isEmpty()) {
+      lastProcessedRecordSeqNumber = new ExtendedSequenceNumber(records.get(records.size() - 1).getSequenceNumber());
+      listener.onReceiveRecords(ssp, records, processRecordsInput.getMillisBehindLatest());
+    }
+  }
+
+  /**
+   * Invoked by the Samza thread to commit checkpoint for the shard owned by the record processor instance.
+   *
+   * @param seqNumber sequenceNumber to checkpoint for the shard owned by this processor instance.
+   */
+  public void checkpoint(String seqNumber) {
+    ExtendedSequenceNumber seqNumberToCheckpoint = new ExtendedSequenceNumber(seqNumber);
+    if (initSeqNumber.compareTo(seqNumberToCheckpoint) > 0) {
+      LOG.warn("Samza called checkpoint with seqNumber {} smaller than initial seqNumber {} for {}. Ignoring it!",
+          seqNumber, initSeqNumber, this);
+      return;
+    }
+
+    if (checkpointer == null) {
+      // checkpointer could be null as a result of shard re-assignment before the first record is processed.
+      LOG.warn("Ignoring checkpointing for {} with seqNumber {} because of re-assignment.", this, seqNumber);
+      return;
+    }
+
+    try {
+      checkpointer.checkpoint(seqNumber);
+      lastCheckpointedRecordSeqNumber = seqNumberToCheckpoint;
+    } catch (ShutdownException e) {
+      // This can happen as a result of shard re-assignment.
+      String msg = String.format("Checkpointing %s with seqNumber %s failed with exception. Dropping the checkpoint.",
+          this, seqNumber);
+      LOG.warn(msg, e);
+    } catch (InvalidStateException e) {
+      // This can happen when KCL encounters issues with internal state, eg: dynamoDB table is not found
+      String msg =
+          String.format("Checkpointing %s with seqNumber %s failed with exception.", this, seqNumber);
+      LOG.error(msg, e);
+      throw new SamzaException(msg, e);
+    } catch (ThrottlingException e) {
+      // Throttling is handled by KCL via the client lib configuration properties. If we get an exception inspite of
+      // throttling back-off behavior, let's throw an exception as the configs
+      String msg = String.format("Checkpointing %s with seqNumber %s failed with exception. Checkpoint interval is"
+              + " too aggressive for the provisioned throughput of the dynamoDB table where the checkpoints are stored."
+              + " Either reduce the checkpoint interval -or- increase the throughput of dynamoDB table.", this,
+          seqNumber);
+      throw new SamzaException(msg);
+    }
+  }
+
+  /**
+   * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
+   * RecordProcessor instance.
+   *
+   * @param shutdownInput Provides information and capabilities (eg checkpointing) related to shutdown of this record
+   *        processor.
+   */
+  @Override
+  public void shutdown(ShutdownInput shutdownInput) {
+    LOG.info("Shutting down {} with reason:{}", this, shutdownInput.getShutdownReason());
+
+    Validate.isTrue(!shutdownRequested, String.format("KCL called shutdown more than once for processor %s.", this));
+    shutdownRequested = true;
+    // shutdown reason TERMINATE indicates that the shard is closed due to re-sharding. It also indicates that all the
+    // records from the shard have been delivered to the consumer and the consumer is expected to checkpoint the
+    // progress.
+    if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
+      // We need to ensure that all records are processed and checkpointed before going ahead and marking the processing
+      // complete by calling checkpoint() on KCL. We need to checkpoint the completion state for parent shard, for KCL
+      // to consume from the child shard(s).
+      try {
+        LOG.info("Waiting for all the records for {} to be processed.", this);
+        // Let's poll periodically and block until the last processed record is checkpointed. Also handle the case
+        // where there are no records received for the processor, in which case the lastProcessedRecordSeqNumber will
+        // be null.
+        while (lastProcessedRecordSeqNumber != null
+            && !lastProcessedRecordSeqNumber.equals(lastCheckpointedRecordSeqNumber)) {
+          Thread.sleep(POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS);
+        }
+        LOG.info("Final checkpoint for {} before shutting down.", this);
+        shutdownInput.getCheckpointer().checkpoint();
+      } catch (Exception e) {
+        LOG.warn("An error occurred while committing the final checkpoint in the parent shard {}", this, e);
+      }
+    }
+    listener.onShutdown(ssp);
+  }
+
+  String getShardId() {
+    return shardId;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("KinesisRecordProcessor: ssp %s shard %s hashCode %s", ssp, shardId, hashCode());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java
new file mode 100644
index 0000000..72d86b9
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.List;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+import com.amazonaws.services.kinesis.model.Record;
+
+
+/**
+ * Listener interface implemented by consumer to be notified when {@link KinesisRecordProcessor} receives records and
+ * is ready to shutdown.
+ */
+public interface KinesisRecordProcessorListener {
+  /**
+   * Method invoked by
+   * {@link KinesisRecordProcessor#processRecords(com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput)}
+   * when the records are received by the processor.
+   * @param ssp Samza partition for which the records belong to
+   * @param records List of kinesis records
+   * @param millisBehindLatest Time lag of the batch of records with respect to the tip of the stream
+   */
+  void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest);
+
+  /**
+   * Method invoked by
+   * {@link KinesisRecordProcessor#shutdown(com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput)}
+   * when the processor is ready to shutdown.
+   * @param ssp Samza partition for which the shutdown is invoked
+   */
+  void onShutdown(SystemStreamPartition ssp);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
new file mode 100644
index 0000000..6afffd3
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointListener;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.kinesis.KinesisConfig;
+import org.apache.samza.system.kinesis.metrics.KinesisSystemConsumerMetrics;
+import org.apache.samza.util.BlockingEnvelopeMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
+import com.amazonaws.services.kinesis.model.Record;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+/**
+ * The system consumer for Kinesis, extending the {@link BlockingEnvelopeMap}.
+ *
+ * The system consumer creates a KinesisWorker per stream in it's own thread by providing a RecordProcessorFactory.
+ * Kinesis Client Library (KCL) uses this factory to instantiate a KinesisRecordProcessor for each shard in the Kinesis
+ * stream. KCL pushes data records to the appropriate record processor and the processor is responsible for processing
+ * the resulting records and place them into a blocking queue in {@link BlockingEnvelopeMap}.
+ *
+ * <pre>
+ *   {@code
+ *                                                                                Shard1  +----------------------+
+ *                                                                . --------------------> |KinesisRecordProcessor|
+ *                        Stream1                                 |               Shard2  +----------------------+
+ *                              +-------------+     +-----------------------------+       +----------------------+
+ *             .--------------->|    Worker   |---->|    RecordProcessorFactory   | ----> |KinesisRecordProcessor|
+ *             |                +-------------+     +-------------+---------------+       +----------------------+
+ *             |                                                  |               Shard3  +----------------------+
+ *             |                                                  . --------------------> |KinesisRecordProcessor|
+ *             |                                                                          +----------------------+
+ *             |          Stream2
+ *  +---------------------+     +-------------+     +-----------------------------+        +-------+
+ *  |KinesisSystemConsumer|---->|    Worker   |---->|    RecordProcessorFactory   |------->|  ...  |
+ *  +---------------------+     +-------------+     +-----------------------------+        +-------+
+ *             |
+ *             |
+ *             |
+ *             |
+ *             |                +-----------+
+ *             . -------------->|    ...    |
+ *                              +-----------+
+ *  }
+ *  </pre>
+ * Since KinesisSystemConsumer uses KCL, the checkpoint state is stored in a dynamoDB table which is maintained by KCL.
+ * KinesisSystemConsumer implements CheckpointListener to commit checkpoints via KCL.
+ */
+
+public class KinesisSystemConsumer extends BlockingEnvelopeMap implements CheckpointListener, KinesisRecordProcessorListener {
+
+  private static final int MAX_BLOCKING_QUEUE_SIZE = 100;
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisSystemConsumer.class.getName());
+
+  private final String system;
+  private final KinesisConfig kConfig;
+  private final KinesisSystemConsumerMetrics metrics;
+  private final SSPAllocator sspAllocator;
+
+  private final Set<String> streams = new HashSet<>();
+  private final Map<SystemStreamPartition, KinesisRecordProcessor> processors = new ConcurrentHashMap<>();
+  private final List<Worker> workers = new LinkedList<>();
+
+  private ExecutorService executorService;
+
+  private volatile Exception callbackException;
+
+  public KinesisSystemConsumer(String systemName, KinesisConfig kConfig, MetricsRegistry registry) {
+    super(registry, System::currentTimeMillis, null);
+    this.system = systemName;
+    this.kConfig = kConfig;
+    this.metrics = new KinesisSystemConsumerMetrics(registry);
+    this.sspAllocator = new SSPAllocator();
+  }
+
+  @Override
+  protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
+    return new LinkedBlockingQueue<>(MAX_BLOCKING_QUEUE_SIZE);
+  }
+
+  @Override
+  protected void put(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) {
+    try {
+      super.put(ssp, envelope);
+    } catch (Exception e) {
+      LOG.error("Exception while putting record. Shutting down SystemStream {}", ssp.getSystemStream(), e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public void register(SystemStreamPartition ssp, String offset) {
+    LOG.info("Register called with ssp {} and offset {}. Offset will be ignored.", ssp, offset);
+    String stream = ssp.getStream();
+    streams.add(stream);
+    sspAllocator.free(ssp);
+    super.register(ssp, offset);
+  }
+
+  @Override
+  public void start() {
+    LOG.info("Start samza consumer for system {}.", system);
+
+    metrics.initializeMetrics(streams);
+
+    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("kinesis-worker-thread-" + system + "-%d")
+        .build();
+    // launch kinesis workers in separate threads, one per stream
+    executorService = Executors.newFixedThreadPool(streams.size(), namedThreadFactory);
+
+    for (String stream : streams) {
+      // KCL Dynamodb table is used for storing the state of processing. By default, the table name is the same as the
+      // application name. Dynamodb table name must be unique for a given account and region (even across different
+      // streams). So, let's create the default one with the combination of job name, job id and stream name. The table
+      // name could be changed by providing a different TableName via KCL specific config.
+      String kinesisApplicationName =
+          kConfig.get(JobConfig.JOB_NAME()) + "-" + kConfig.get(JobConfig.JOB_ID()) + "-" + stream;
+
+      Worker worker = new Worker.Builder()
+          .recordProcessorFactory(createRecordProcessorFactory(stream))
+          .config(kConfig.getKinesisClientLibConfig(system, stream, kinesisApplicationName))
+          .build();
+
+      workers.add(worker);
+
+      // launch kinesis workers in separate thread-pools, one per stream
+      executorService.execute(worker);
+      LOG.info("Started worker for system {} stream {}.", system, stream);
+    }
+  }
+
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+      Set<SystemStreamPartition> ssps, long timeout) throws InterruptedException {
+    if (callbackException != null) {
+      throw new SamzaException(callbackException);
+    }
+    return super.poll(ssps, timeout);
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("Stop samza consumer for system {}.", system);
+    workers.forEach(Worker::shutdown);
+    workers.clear();
+    executorService.shutdownNow();
+    LOG.info("Kinesis system consumer executor service for system {} is shutdown.", system);
+  }
+
+  // package-private for tests
+  IRecordProcessorFactory createRecordProcessorFactory(String stream) {
+    return () -> {
+      // This code is executed in Kinesis thread context.
+      try {
+        SystemStreamPartition ssp = sspAllocator.allocate(stream);
+        KinesisRecordProcessor processor = new KinesisRecordProcessor(ssp, KinesisSystemConsumer.this);
+        KinesisRecordProcessor prevProcessor = processors.put(ssp, processor);
+        Validate.isTrue(prevProcessor == null, String.format("Adding new kinesis record processor %s while the"
+                + " previous processor %s for the same ssp %s is still active.", processor, prevProcessor, ssp));
+        return processor;
+      } catch (Exception e) {
+        callbackException = e;
+        // This exception is the result of kinesis dynamic shard splits due to which sspAllocator ran out of free ssps.
+        // Set the failed state in consumer which will eventually result in stopping the container. A manual job restart
+        // will be required at this point. After the job restart, the newly created shards will be discovered and enough
+        // ssps will be added to sspAllocator freePool.
+        throw new SamzaException(e);
+      }
+    };
+  }
+
+  @Override
+  public void onCheckpoint(Map<SystemStreamPartition, String> sspOffsets) {
+    LOG.info("onCheckpoint called with sspOffsets {}", sspOffsets);
+    sspOffsets.forEach((ssp, offset) -> {
+        KinesisRecordProcessor processor = processors.get(ssp);
+        KinesisSystemConsumerOffset kinesisOffset = KinesisSystemConsumerOffset.parse(offset);
+        if (processor == null) {
+          LOG.info("Kinesis Processor is not alive for ssp {}. This could be the result of rebalance. Hence dropping the"
+              + " checkpoint {}.", ssp, offset);
+        } else if (!kinesisOffset.getShardId().equals(processor.getShardId())) {
+          LOG.info("KinesisProcessor for ssp {} currently owns shard {} while the checkpoint is for shard {}. This could"
+              + " be the result of rebalance. Hence dropping the checkpoint {}.", ssp, processor.getShardId(),
+              kinesisOffset.getShardId(), offset);
+        } else {
+          processor.checkpoint(kinesisOffset.getSeqNumber());
+        }
+      });
+  }
+
+  @Override
+  public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) {
+    metrics.updateMillisBehindLatest(ssp.getStream(), millisBehindLatest);
+    records.forEach(record -> put(ssp, translate(ssp, record)));
+  }
+
+  @Override
+  public void onShutdown(SystemStreamPartition ssp) {
+    processors.remove(ssp);
+    sspAllocator.free(ssp);
+  }
+
+  private IncomingMessageEnvelope translate(SystemStreamPartition ssp, Record record) {
+    String shardId = processors.get(ssp).getShardId();
+    byte[] payload = new byte[record.getData().remaining()];
+
+    metrics.updateMetrics(ssp.getStream(), record);
+    record.getData().get(payload);
+    KinesisSystemConsumerOffset offset = new KinesisSystemConsumerOffset(shardId, record.getSequenceNumber());
+    return new KinesisIncomingMessageEnvelope(ssp, offset.toString(), record.getPartitionKey(),
+        payload, shardId, record.getSequenceNumber(), record.getApproximateArrivalTimestamp());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
new file mode 100644
index 0000000..13296ca
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Kinesis system consumer related checkpoint information that is stored in the IncomingMessageEnvelope offset.
+ *
+ * It contains the following metadata:
+ * <ul>
+ *   <li> shardId: Kinesis stream shardId.
+ *   <li> seqNumber: sequence number in the above shard.
+ * </ul>
+ *
+ * Please note that the source of truth for checkpointing is the AWS dynamoDB table corresponding to the application.
+ * The offset that is stored in Samza checkpoint topic is not used.
+ */
+public class KinesisSystemConsumerOffset {
+
+  @JsonProperty("shardId")
+  private String shardId;
+  @JsonProperty("seqNumber")
+  private String seqNumber;
+
+  @JsonCreator
+  KinesisSystemConsumerOffset(@JsonProperty("shardId") String shardId,
+      @JsonProperty("seqNumber") String seqNumber) {
+    this.shardId = shardId;
+    this.seqNumber = seqNumber;
+  }
+
+  String getShardId() {
+    return shardId;
+  }
+
+  String getSeqNumber() {
+    return seqNumber;
+  }
+
+  static KinesisSystemConsumerOffset parse(String metadata) {
+    JsonSerdeV2<KinesisSystemConsumerOffset> jsonSerde = new JsonSerdeV2<>(KinesisSystemConsumerOffset.class);
+    byte[] bytes;
+    try {
+      bytes = metadata.getBytes("UTF-8");
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+    return jsonSerde.fromBytes(bytes);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public String toString() {
+    JsonSerdeV2<KinesisSystemConsumerOffset> jsonSerde = new JsonSerdeV2<>(KinesisSystemConsumerOffset.class);
+    return new String(jsonSerde.toBytes(this));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == this) {
+      return true;
+    }
+    if (!(o instanceof KinesisSystemConsumerOffset)) {
+      return false;
+    }
+
+    String thatShardId = ((KinesisSystemConsumerOffset) o).getShardId();
+    if (!(shardId == null ? thatShardId == null : shardId.equals(thatShardId))) {
+      return false;
+    }
+    String thatSeqNumber = ((KinesisSystemConsumerOffset) o).getSeqNumber();
+    if (!(seqNumber == null ? thatSeqNumber == null : seqNumber.equals(thatSeqNumber))) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = shardId.hashCode();
+    result = 31 * result + seqNumber.hashCode();
+    return result;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java
new file mode 100644
index 0000000..6caf760
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+
+/**
+ * SSPAllocator is unable to allocate an SSP
+ */
+public class NoAvailablePartitionException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  public NoAvailablePartitionException(String message) {
+    super(message);
+  }
+
+  public NoAvailablePartitionException(String message, Exception e) {
+    super(message, e);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
new file mode 100644
index 0000000..4b7cff8
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SSPAllocator is responsible for assigning Samza SystemStreamPartitions (SSPs). It provides two APIs:
+ * <ul>
+ *   <li> allocate: Given a stream, returns free ssp.
+ *   <li> free: Adds ssp back to the free pool.
+ * </ul>
+ * A free (unallocated) ssp is returned for every allocate request and when there is no available ssp to allocate,
+ * the allocator throws NoAvailablePartitionException. Allocator could run out of free ssps as a result of dynamic
+ * shard splits.
+ */
+class SSPAllocator {
+  private static final Logger LOG = LoggerFactory.getLogger(SSPAllocator.class.getName());
+
+  private final Map<String, Set<SystemStreamPartition>> availableSsps = new HashMap<>();
+
+  synchronized SystemStreamPartition allocate(String stream) throws NoAvailablePartitionException {
+    Validate.isTrue(availableSsps.get(stream) != null,
+        String.format("availableSsps is null for stream %s", stream));
+
+    if (availableSsps.get(stream).size() <= 0) {
+      // Set a flag in system consumer so that it could throw an exception in the subsequent poll.
+      throw new NoAvailablePartitionException(String.format("More shards detected for stream %s than initially"
+          + " registered. Could be the result of dynamic resharding.", stream));
+    }
+
+    SystemStreamPartition ssp = availableSsps.get(stream).iterator().next();
+    availableSsps.get(stream).remove(ssp);
+
+    LOG.info("Number of unassigned partitions for system-stream {} is {}.", ssp.getSystemStream(),
+        availableSsps.get(ssp.getStream()).size());
+    return ssp;
+  }
+
+  synchronized void free(SystemStreamPartition ssp) {
+    boolean success = availableSsps.computeIfAbsent(ssp.getStream(), p -> new HashSet<>()).add(ssp);
+    Validate.isTrue(success, String.format("Ssp %s is already in free pool.", ssp));
+
+    LOG.info("Number of unassigned partitions for system-stream {} is {}.", ssp.getSystemStream(),
+        availableSsps.get(ssp.getStream()).size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
new file mode 100644
index 0000000..2f42981
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.metrics;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+
+import com.amazonaws.services.kinesis.model.Record;
+
+
+/**
+ * KinesisSystemConsumerMetrics class has per-stream metrics and aggregate metrics across kinesis consumers
+ */
+
+public class KinesisSystemConsumerMetrics {
+
+  private final MetricsRegistry registry;
+
+  // Aggregate metrics across all kinesis system consumers
+  private static Counter aggEventReadRate = null;
+  private static Counter aggEventByteReadRate = null;
+  private static SamzaHistogram aggReadLatency = null;
+  private static SamzaHistogram aggMillisBehindLatest = null;
+
+  // Per-stream metrics
+  private Map<String, Counter> eventReadRates;
+  private Map<String, Counter> eventByteReadRates;
+  private Map<String, SamzaHistogram> readLatencies;
+  private Map<String, SamzaHistogram> millisBehindLatest;
+
+  private static final Object LOCK = new Object();
+
+  private static final String AGGREGATE = "aggregate";
+  private static final String EVENT_READ_RATE = "eventReadRate";
+  private static final String EVENT_BYTE_READ_RATE = "eventByteReadRate";
+  private static final String READ_LATENCY = "readLatency";
+  private static final String MILLIS_BEHIND_LATEST = "millisBehindLatest";
+
+  public KinesisSystemConsumerMetrics(MetricsRegistry registry) {
+    this.registry = registry;
+  }
+
+  public void initializeMetrics(Set<String> streamNames) {
+    eventReadRates = streamNames.stream()
+        .collect(Collectors.toConcurrentMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE)));
+    eventByteReadRates = streamNames.stream()
+        .collect(Collectors.toConcurrentMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE)));
+    readLatencies = streamNames.stream()
+        .collect(Collectors.toConcurrentMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY)));
+    millisBehindLatest = streamNames.stream()
+        .collect(Collectors.toConcurrentMap(Function.identity(),
+            x -> new SamzaHistogram(registry, x, MILLIS_BEHIND_LATEST)));
+
+    // Locking to ensure that these aggregated metrics will be created only once across multiple system consumers.
+    synchronized (LOCK) {
+      if (aggEventReadRate == null) {
+        aggEventReadRate = registry.newCounter(AGGREGATE, EVENT_READ_RATE);
+        aggEventByteReadRate = registry.newCounter(AGGREGATE, EVENT_BYTE_READ_RATE);
+        aggReadLatency = new SamzaHistogram(registry, AGGREGATE, READ_LATENCY);
+        aggMillisBehindLatest = new SamzaHistogram(registry, AGGREGATE, MILLIS_BEHIND_LATEST);
+      }
+    }
+  }
+
+  public void updateMillisBehindLatest(String stream, Long millisBehindLatest) {
+    this.millisBehindLatest.get(stream).update(millisBehindLatest);
+    aggMillisBehindLatest.update(millisBehindLatest);
+  }
+
+  public void updateMetrics(String stream, Record record) {
+    eventReadRates.get(stream).inc();
+    aggEventReadRate.inc();
+
+    long recordSize = record.getData().array().length + record.getPartitionKey().length();
+    eventByteReadRates.get(stream).inc(recordSize);
+    aggEventByteReadRate.inc(recordSize);
+
+    long latencyMs = Duration.between(Instant.now(), record.getApproximateArrivalTimestamp().toInstant()).toMillis();
+    readLatencies.get(stream).update(latencyMs);
+    aggReadLatency.update(latencyMs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java
new file mode 100644
index 0000000..29964dc
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.metrics;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+
+
+class SamzaHistogram {
+
+  private static final List<Double> DEFAULT_HISTOGRAM_PERCENTILES = Arrays.asList(50D, 99D);
+  private final MetricsRegistry registry;
+  private final Histogram histogram;
+  private final List<Double> percentiles;
+  private final Map<Double, Gauge<Double>> gauges;
+
+  SamzaHistogram(MetricsRegistry registry, String group, String name) {
+    this(registry, group, name, DEFAULT_HISTOGRAM_PERCENTILES);
+  }
+
+  SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) {
+    this.registry = registry;
+    this.histogram = new Histogram(new ExponentiallyDecayingReservoir());
+    this.percentiles = percentiles;
+    this.gauges = percentiles.stream()
+        .filter(x -> x > 0 && x <= 100)
+        .collect(
+            Collectors.toMap(Function.identity(), x -> this.registry.newGauge(group, name + "_" + String.valueOf(0), 0D)));
+  }
+
+  synchronized void update(long value) {
+    histogram.update(value);
+    Snapshot values = histogram.getSnapshot();
+    percentiles.forEach(x -> gauges.get(x).set(values.getValue(x / 100)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java
new file mode 100644
index 0000000..93887ed
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestKinesisAWSCredentialsProvider {
+
+  @Test
+  public void testCredentialsProviderWithNonNullKeys() {
+    String accessKey = "accessKey";
+    String secretKey = "secretKey";
+    KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(accessKey, secretKey);
+    assertEquals(credProvider.getCredentials().getAWSAccessKeyId(), accessKey);
+    assertEquals(credProvider.getCredentials().getAWSSecretKey(), secretKey);
+  }
+
+  @Test
+  public void testCredentialsProviderWithNullAccessKey() {
+    String secretKey = "secretKey";
+    KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(null, secretKey);
+    assertNull(credProvider.getCredentials().getAWSAccessKeyId());
+    assertNull(credProvider.getCredentials().getAWSSecretKey());
+  }
+
+  @Test
+  public void testCredentialsProviderWithNullSecretKey() {
+    String accessKey = "accessKey";
+    KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(accessKey, null);
+    assertNull(credProvider.getCredentials().getAWSAccessKeyId());
+    assertNull(credProvider.getCredentials().getAWSSecretKey());
+  }
+
+  @Test
+  public void testCredentialsProviderWithNullKeys() {
+    KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(null, null);
+    assertNull(credProvider.getCredentials().getAWSAccessKeyId());
+    assertNull(credProvider.getCredentials().getAWSSecretKey());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java
new file mode 100644
index 0000000..56e4810
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.junit.Test;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+
+import static org.junit.Assert.*;
+
+
+public class TestKinesisConfig {
+  @Test
+  public void testGetKinesisStreams() {
+    Map<String, String> kv = new HashMap<>();
+    kv.put("systems.kinesis.streams.kinesis-stream1.prop1", "value1");
+    kv.put("systems.kinesis.streams.kinesis-stream1.prop2", "value2");
+    kv.put("systems.kinesis.streams.kinesis-stream2.prop1", "value3");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+
+    Set<String> streams = kConfig.getKinesisStreams("kinesis");
+    assertEquals(2, streams.size());
+  }
+
+  @Test
+  public void testKinesisConfigs() {
+    Map<String, String> kv = new HashMap<>();
+    String system = "kinesis";
+    String stream = "kinesis-stream";
+    String systemConfigPrefix = String.format("systems.%s.", system);
+    String ssConfigPrefix = String.format("systems.%s.streams.%s.", system, stream);
+
+    kv.put("sensitive." + ssConfigPrefix + "aws.secretKey", "secretKey");
+    kv.put(systemConfigPrefix + "aws.region", "us-east-1");
+    kv.put(ssConfigPrefix + "aws.accessKey", "accessKey");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+
+    assertEquals("us-east-1", kConfig.getRegion(system, stream).getName());
+    assertEquals("accessKey", kConfig.getStreamAccessKey(system, stream));
+    assertEquals("secretKey", kConfig.getStreamSecretKey(system, stream));
+  }
+
+  @Test
+  public void testAwsClientConfigs() {
+    Map<String, String> kv = new HashMap<>();
+    String system = "kinesis";
+    String systemConfigPrefix = String.format("systems.%s.", system);
+
+    // Aws Client Configs
+    kv.put(systemConfigPrefix + "aws.clientConfig.ProxyHost", "hostName");
+    kv.put(systemConfigPrefix + "aws.clientConfig.ProxyPort", "8080");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+
+    assertEquals("hostName", kConfig.getAWSClientConfig(system).getProxyHost());
+    assertEquals(8080, kConfig.getAWSClientConfig(system).getProxyPort());
+  }
+
+  @Test
+  public void testKclConfigs() {
+    Map<String, String> kv = new HashMap<>();
+    String system = "kinesis";
+    String stream = "kinesis-stream";
+    String systemConfigPrefix = String.format("systems.%s.", system);
+
+    // region config is required for setting kcl config.
+    kv.put(systemConfigPrefix + "aws.region", "us-east-1");
+
+    // Kcl Configs
+    kv.put(systemConfigPrefix + "aws.kcl.TableName", "sample-table");
+    kv.put(systemConfigPrefix + "aws.kcl.MaxRecords", "100");
+    kv.put(systemConfigPrefix + "aws.kcl.CallProcessRecordsEvenForEmptyRecordList", "true");
+    kv.put(systemConfigPrefix + "aws.kcl.InitialPositionInStream", "TRIM_HORIZON");
+    // override one of the Kcl configs for kinesis-stream1
+    kv.put(systemConfigPrefix + "streams.kinesis-stream1.aws.kcl.InitialPositionInStream", "LATEST");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+    KinesisClientLibConfiguration kclConfig = kConfig.getKinesisClientLibConfig(system, stream, "sample-app");
+
+    assertEquals("sample-table", kclConfig.getTableName());
+    assertEquals(100, kclConfig.getMaxRecords());
+    assertTrue(kclConfig.shouldCallProcessRecordsEvenForEmptyRecordList());
+    assertEquals(InitialPositionInStream.TRIM_HORIZON, kclConfig.getInitialPositionInStream());
+
+    // verify if the overriden config is applied for kinesis-stream1
+    kclConfig = kConfig.getKinesisClientLibConfig(system, "kinesis-stream1", "sample-app");
+    assertEquals(InitialPositionInStream.LATEST, kclConfig.getInitialPositionInStream());
+  }
+
+  @Test
+  public void testgetKCLConfigWithUnknownConfigs() {
+    Map<String, String> kv = new HashMap<>();
+    kv.put("systems.kinesis.aws.region", "us-east-1");
+    kv.put("systems.kinesis.streams.kinesis-stream.aws.kcl.random", "value");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+
+    // Should not throw any exception and just ignore the unknown configs.
+    kConfig.getKinesisClientLibConfig("kinesis", "kinesis-stream", "sample-app");
+  }
+}


[8/8] samza git commit: Merge branch 'master' into 0.14.0

Posted by xi...@apache.org.
Merge branch 'master' into 0.14.0


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/04ee7fee
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/04ee7fee
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/04ee7fee

Branch: refs/heads/0.14.0
Commit: 04ee7fee60a79e073e634d08ba5be881376e1bd0
Parents: fae73b2 12e61e9
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Authored: Wed Nov 29 11:35:37 2017 -0800
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Wed Nov 29 11:35:37 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |  55 +++-
 .../versioned/yarn/yarn-host-affinity.md        |   2 +-
 .../versioned/hello-samza-high-level-code.md    |  87 +++---
 docs/startup/preview/index.md                   |  67 +++--
 .../kinesis/KinesisAWSCredentialsProvider.java  |  69 +++++
 .../samza/system/kinesis/KinesisConfig.java     | 287 ++++++++++++++++++
 .../system/kinesis/KinesisSystemAdmin.java      | 124 ++++++++
 .../system/kinesis/KinesisSystemFactory.java    |  87 ++++++
 .../KinesisIncomingMessageEnvelope.java         |  62 ++++
 .../consumer/KinesisRecordProcessor.java        | 208 +++++++++++++
 .../KinesisRecordProcessorListener.java         |  51 ++++
 .../kinesis/consumer/KinesisSystemConsumer.java | 256 ++++++++++++++++
 .../consumer/KinesisSystemConsumerOffset.java   | 107 +++++++
 .../consumer/NoAvailablePartitionException.java |  38 +++
 .../system/kinesis/consumer/SSPAllocator.java   |  73 +++++
 .../metrics/KinesisSystemConsumerMetrics.java   | 106 +++++++
 .../system/kinesis/metrics/SamzaHistogram.java  |  63 ++++
 .../TestKinesisAWSCredentialsProvider.java      |  60 ++++
 .../samza/system/kinesis/TestKinesisConfig.java | 132 ++++++++
 .../kinesis/TestKinesisSystemFactory.java       | 115 +++++++
 .../consumer/TestKinesisRecordProcessor.java    | 301 +++++++++++++++++++
 .../consumer/TestKinesisSystemConsumer.java     | 270 +++++++++++++++++
 .../TestKinesisSystemConsumerOffset.java        |  48 +++
 .../kinesis/consumer/TestSSPAllocator.java      | 127 ++++++++
 .../apache/samza/config/JavaStorageConfig.java  |   5 +-
 .../samza/operators/impl/OperatorImplGraph.java |  14 +-
 .../operators/impl/PartitionByOperatorImpl.java |   3 +-
 .../samza/config/TestJavaStorageConfig.java     |  13 +
 settings.gradle                                 |   5 +-
 29 files changed, 2740 insertions(+), 95 deletions(-)
----------------------------------------------------------------------