You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/11/28 21:12:14 UTC

[1/2] samza git commit: SAMZA-1515; Implement a consumer for Kinesis

Repository: samza
Updated Branches:
  refs/heads/master 5e68d621a -> 9961023f7


http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java
new file mode 100644
index 0000000..5b3b335
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisSystemFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+public class TestKinesisSystemFactory {
+  private static final String SYSTEM_FACTORY_REGEX = "systems.%s.samza.factory";
+  private static final String KINESIS_SYSTEM_FACTORY = KinesisSystemFactory.class.getName();
+
+  @Test
+  public void testGetConsumer() {
+    String systemName = "test";
+    Config config = buildKinesisConsumerConfig(systemName);
+    KinesisSystemFactory factory = new KinesisSystemFactory();
+    MetricsRegistry metricsRegistry = new NoOpMetricsRegistry();
+    Assert.assertNotSame(factory.getConsumer("test", config, metricsRegistry), factory.getAdmin(systemName, config));
+  }
+
+  @Ignore
+  @Test(expected = ConfigException.class)
+  public void testGetAdminWithIncorrectSspGrouper() {
+    String systemName = "test";
+    KinesisSystemFactory factory = new KinesisSystemFactory();
+    Config config = buildKinesisConsumerConfig(systemName,
+        "org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory");
+    factory.getAdmin(systemName, config);
+  }
+
+  @Ignore
+  @Test(expected = ConfigException.class)
+  public void testGetAdminWithBroadcastStreams() {
+    String systemName = "test";
+    KinesisSystemFactory factory = new KinesisSystemFactory();
+    Config config = buildKinesisConsumerConfig(systemName,
+        "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory",
+        "test.stream#0");
+    factory.getAdmin(systemName, config);
+  }
+
+  @Ignore
+  @Test(expected = ConfigException.class)
+  public void testGetAdminWithBootstrapStream() {
+    String systemName = "test";
+    KinesisSystemFactory factory = new KinesisSystemFactory();
+    Config config = buildKinesisConsumerConfig(systemName,
+        "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory",
+        null,
+        "kinesis-stream"
+        );
+    factory.getAdmin(systemName, config);
+  }
+
+  private static Config buildKinesisConsumerConfig(String systemName) {
+    return buildKinesisConsumerConfig(systemName, AllSspToSingleTaskGrouperFactory.class.getCanonicalName());
+  }
+
+  private static Config buildKinesisConsumerConfig(String systemName, String sspGrouperFactory) {
+    return buildKinesisConsumerConfig(systemName, sspGrouperFactory, null);
+  }
+
+  private static Config buildKinesisConsumerConfig(String systemName, String sspGrouperFactory,
+      String broadcastStreamConfigValue) {
+    return buildKinesisConsumerConfig(systemName, sspGrouperFactory, broadcastStreamConfigValue, null);
+  }
+
+  private static Config buildKinesisConsumerConfig(String systemName, String sspGrouperFactory,
+      String broadcastStreamConfigValue, String bootstrapStreamName) {
+    Map<String, String> props = buildSamzaKinesisSystemConfig(systemName, sspGrouperFactory, broadcastStreamConfigValue,
+        bootstrapStreamName);
+    return new MapConfig(props);
+  }
+
+  private static Map<String, String> buildSamzaKinesisSystemConfig(String systemName, String sspGrouperFactory,
+      String broadcastStreamConfigValue, String bootstrapStreamName) {
+    Map<String, String> result = new HashMap<>();
+    result.put(String.format(SYSTEM_FACTORY_REGEX, systemName), KINESIS_SYSTEM_FACTORY);
+    result.put("job.systemstreampartition.grouper.factory", sspGrouperFactory);
+    if (broadcastStreamConfigValue != null && !broadcastStreamConfigValue.isEmpty()) {
+      result.put("task.broadcast.inputs", broadcastStreamConfigValue);
+    }
+    if (bootstrapStreamName != null && !bootstrapStreamName.isEmpty()) {
+      result.put("systems." + systemName + ".streams." + bootstrapStreamName + ".samza.bootstrap", "true");
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
new file mode 100644
index 0000000..6379fcc
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+import com.amazonaws.services.kinesis.model.Record;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestKinesisRecordProcessor {
+  private static final long MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS =
+      KinesisRecordProcessor.POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS + 1000;
+
+  @Test
+  public void testLifeCycleWithEvents() throws InterruptedException, ShutdownException, InvalidStateException,
+                                               NoSuchFieldException, IllegalAccessException {
+    testLifeCycleHelper(5);
+  }
+
+  @Test
+  public void testLifeCycleWithNoEvents() throws InterruptedException, ShutdownException, InvalidStateException,
+                                                 NoSuchFieldException, IllegalAccessException {
+    testLifeCycleHelper(0);
+  }
+
+  private void testLifeCycleHelper(int numRecords) throws InterruptedException, ShutdownException,
+                                                          InvalidStateException, NoSuchFieldException,
+                                                          IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
+    final CountDownLatch receivedRecordsLatch = new CountDownLatch(numRecords > 0 ? 1 : 0);
+
+    KinesisRecordProcessorListener listener = new KinesisRecordProcessorListener() {
+      @Override
+      public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) {
+        receivedRecordsLatch.countDown();
+      }
+
+      @Override
+      public void onShutdown(SystemStreamPartition ssp) {
+        receivedShutdownLatch.countDown();
+      }
+    };
+
+    KinesisRecordProcessor processor =
+        new KinesisRecordProcessor(new SystemStreamPartition(system, stream, new Partition(0)), listener);
+
+    // Initialize the processor
+    ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+    InitializationInput initializationInput =
+        new InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum);
+    processor.initialize(initializationInput);
+
+    // Call processRecords on the processor
+    List<Record> records = generateRecords(numRecords, Collections.singletonList(processor)).get(processor);
+
+    // Verification steps
+
+    // Verify there is a receivedRecords call to listener.
+    Assert.assertTrue("Unable to receive records.", receivedRecordsLatch.getCount() == 0);
+
+    if (numRecords > 0) {
+      // Call checkpoint on last record
+      processor.checkpoint(records.get(records.size() - 1).getSequenceNumber());
+    }
+
+
+    // Call shutdown (with ZOMBIE reason) on processor and verify that the processor calls shutdown on the listener.
+    shutDownProcessor(processor, ShutdownReason.ZOMBIE);
+
+    // Verify that the processor is shutdown.
+    Assert.assertTrue("Unable to shutdown processor.", receivedShutdownLatch.getCount() == 0);
+  }
+
+  /**
+   * Test the scenario where a processor instance is created for a shard and while it is processing records, it got
+   * re-assigned to the same consumer. This results in a new processor instance owning the shard and this instance
+   * could receive checkpoint calls for the records that are processed by the old processor instance. This test covers
+   * the scenario where the new instance receives the checkpoint call while it is done with the initialization phase and
+   * before it processed any records.
+   */
+  @Test
+  public void testCheckpointAfterInit() throws InterruptedException, ShutdownException, InvalidStateException,
+                                               NoSuchFieldException, IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
+
+    KinesisRecordProcessorListener listener = new KinesisRecordProcessorListener() {
+      @Override
+      public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) {
+      }
+
+      @Override
+      public void onShutdown(SystemStreamPartition ssp) {
+        receivedShutdownLatch.countDown();
+      }
+    };
+
+    KinesisRecordProcessor processor =
+        new KinesisRecordProcessor(new SystemStreamPartition(system, stream, new Partition(0)), listener);
+
+    // Initialize the processor
+    ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+    InitializationInput initializationInput =
+        new InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum);
+    processor.initialize(initializationInput);
+
+    // Call checkpoint. This checkpoint could have originally headed to the processor instance for the same shard but
+    // due to reassignment a new processor instance is created.
+    processor.checkpoint("1234567");
+
+
+    // Call shutdown (with ZOMBIE reason) on processor and verify that the processor calls shutdown on the listener.
+    shutDownProcessor(processor, ShutdownReason.ZOMBIE);
+
+    // Verify that the processor is shutdown.
+    Assert.assertTrue("Unable to shutdown processor.", receivedShutdownLatch.getCount() == 0);
+  }
+
+  @Test
+  public void testShutdownDuringReshardWithEvents() throws InterruptedException, ShutdownException,
+                                                           InvalidStateException, NoSuchFieldException,
+                                                           IllegalAccessException {
+    testShutdownDuringReshardHelper(5);
+  }
+
+  @Test
+  public void testShutdownDuringReshardWithNoEvents() throws InterruptedException, ShutdownException,
+                                                             InvalidStateException, NoSuchFieldException,
+                                                             IllegalAccessException {
+    testShutdownDuringReshardHelper(0);
+  }
+
+  private void testShutdownDuringReshardHelper(int numRecords)
+      throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException,
+             IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
+    final CountDownLatch receivedRecordsLatch = new CountDownLatch(numRecords > 0 ? 1 : 0);
+
+    KinesisRecordProcessorListener listener = new KinesisRecordProcessorListener() {
+      @Override
+      public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) {
+        receivedRecordsLatch.countDown();
+      }
+
+      @Override
+      public void onShutdown(SystemStreamPartition ssp) {
+        receivedShutdownLatch.countDown();
+      }
+    };
+
+    KinesisRecordProcessor processor =
+        new KinesisRecordProcessor(new SystemStreamPartition(system, stream, new Partition(0)), listener);
+
+    // Initialize the processor
+    ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+    InitializationInput initializationInput =
+        new InitializationInput().withShardId("shard-0000").withExtendedSequenceNumber(seqNum);
+    processor.initialize(initializationInput);
+
+    // Call processRecords on the processor
+    List<Record> records = generateRecords(numRecords, Collections.singletonList(processor)).get(processor);
+
+    // Verification steps
+
+    // Verify there is a receivedRecords call to listener.
+    Assert.assertTrue("Unable to receive records.", receivedRecordsLatch.getCount() == 0);
+
+    // Call shutdown (with TERMINATE reason) on processor and verify that the processor does not call shutdown on the
+    // listener until checkpoint is called for the last record consumed from shard.
+    new Thread(() -> shutDownProcessor(processor, ShutdownReason.TERMINATE)).start();
+
+    // If there are no records, the processor should shutdown immediately.
+    if (numRecords == 0) {
+      Assert.assertTrue("Unable to shutdown processor.",
+          receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS));
+      return;
+    }
+
+    Assert.assertFalse("Processor shutdown too early.",
+        receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS));
+
+    // Call checkpoint for the last but one record and the processor should still not call shutdown on listener.
+    processor.checkpoint(records.get(records.size() - 2).getSequenceNumber());
+    Assert.assertFalse("Processor shutdown too early.",
+        receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS));
+
+    // Call checkpoint for the last record and the parent partition should be removed from mapper.
+    processor.checkpoint(records.get(records.size() - 1).getSequenceNumber());
+    Assert.assertTrue("Unable to shutdown processor.",
+        receivedShutdownLatch.await(MAX_WAIT_TIME_SHUTDOWN_RECEIVED_MS, TimeUnit.MILLISECONDS));
+  }
+
+  static Map<KinesisRecordProcessor, List<Record>> generateRecords(int numRecordsPerShard,
+      List<KinesisRecordProcessor> processors) throws ShutdownException, InvalidStateException {
+    Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>();
+    processors.forEach(processor -> {
+        try {
+          // Create records and call process records
+          IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
+          doNothing().when(checkpointer).checkpoint(anyString());
+          doNothing().when(checkpointer).checkpoint();
+          ProcessRecordsInput processRecordsInput = Mockito.mock(ProcessRecordsInput.class);
+          when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer);
+          when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L);
+          List<Record> inputRecords = createRecords(numRecordsPerShard);
+          processorRecordMap.put(processor, inputRecords);
+          when(processRecordsInput.getRecords()).thenReturn(inputRecords);
+          processor.processRecords(processRecordsInput);
+        } catch (ShutdownException | InvalidStateException ex) {
+          throw new RuntimeException(ex);
+        }
+      });
+    return processorRecordMap;
+  }
+
+  static void shutDownProcessor(KinesisRecordProcessor processor, ShutdownReason reason) {
+    try {
+      ShutdownInput shutdownInput = Mockito.mock(ShutdownInput.class);
+      when(shutdownInput.getShutdownReason()).thenReturn(reason);
+      when(shutdownInput.getCheckpointer()).thenReturn(getCheckpointer(processor));
+      processor.shutdown(shutdownInput);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  static IRecordProcessorCheckpointer getCheckpointer(KinesisRecordProcessor processor)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field f = processor.getClass().getDeclaredField("checkpointer");
+    f.setAccessible(true);
+    return (IRecordProcessorCheckpointer) f.get(processor);
+  }
+
+  private static List<Record> createRecords(int numRecords) {
+    List<Record> records = new ArrayList<>(numRecords);
+    Random rand = new Random();
+
+    for (int i = 0; i < numRecords; i++) {
+      String dataStr = "testData-" + System.currentTimeMillis();
+      ByteBuffer data = ByteBuffer.wrap(dataStr.getBytes(StandardCharsets.UTF_8));
+      String key = String.format("partitionKey-%d", rand.nextLong());
+      String seqNum = String.format("%04d", 5 * i + 1);
+      Record record = new Record()
+          .withData(data)
+          .withPartitionKey(key)
+          .withSequenceNumber(seqNum)
+          .withApproximateArrivalTimestamp(new Date());
+      records.add(record);
+    }
+    return records;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
new file mode 100644
index 0000000..ade02ac
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.model.Record;
+
+import org.apache.samza.system.kinesis.KinesisConfig;
+import org.apache.samza.system.kinesis.metrics.KinesisSystemConsumerMetrics;
+
+import static org.apache.samza.system.kinesis.consumer.TestKinesisRecordProcessor.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * These class of tests test KinesisSystemConsumer and KinesisRecordProcessor together.
+ */
+public class TestKinesisSystemConsumer {
+  private static final String SYSTEM_CONSUMER_REGISTER_OFFSET = "0000"; // Could be any string
+
+  @Test
+  public void testProcessRecords() throws InterruptedException, ShutdownException, InvalidStateException,
+                                          NoSuchFieldException, IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    int numShards = 2;
+    int numRecordsPerShard = 5;
+
+    testProcessRecordsHelper(system, stream, numShards, numRecordsPerShard);
+  }
+
+  @Test
+  public void testProcessRecordsWithEmptyRecordList() throws InterruptedException, ShutdownException,
+                                                             InvalidStateException, NoSuchFieldException,
+                                                             IllegalAccessException {
+    String system = "kinesis";
+    String stream = "stream";
+    int numShards = 1;
+    int numRecordsPerShard = 0;
+
+    testProcessRecordsHelper(system, stream, numShards, numRecordsPerShard);
+  }
+
+  /**
+   * Helper to simulate and test the life-cycle of record processing from a kinesis stream with a given number of shards
+   * 1. Creation of record processors.
+   * 2. Initialization of record processors.
+   * 3. Processing records via record processors.
+   * 4. Calling checkpoint on record processors.
+   * 5. Shutting down (due to re-assignment or lease expiration) record processors.
+   */
+  private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard)
+      throws InterruptedException, ShutdownException, InvalidStateException,
+             NoSuchFieldException, IllegalAccessException {
+
+    KinesisConfig kConfig = new KinesisConfig(new MapConfig());
+    // Create consumer
+    KinesisSystemConsumer consumer = new KinesisSystemConsumer(system, kConfig, new NoOpMetricsRegistry());
+    initializeMetrics(consumer, stream);
+
+    List<SystemStreamPartition> ssps = new LinkedList<>();
+    IntStream.range(0, numShards)
+        .forEach(p -> {
+            SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p));
+            ssps.add(ssp);
+          });
+    ssps.forEach(ssp -> consumer.register(ssp, SYSTEM_CONSUMER_REGISTER_OFFSET));
+
+    // Create Kinesis record processor factory
+    IRecordProcessorFactory factory = consumer.createRecordProcessorFactory(stream);
+
+    // Create and initialize Kinesis record processor
+    Map<String, KinesisRecordProcessor> processorMap = createAndInitProcessors(factory, numShards);
+    List<KinesisRecordProcessor> processorList = new ArrayList<>(processorMap.values());
+
+    // Generate records to Kinesis record processor
+    Map<KinesisRecordProcessor, List<Record>> inputRecordMap = generateRecords(numRecordsPerShard, processorList);
+
+    // Verification steps
+
+    // Read events from the BEM queue
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages =
+        readEvents(new HashSet<>(ssps), consumer, numRecordsPerShard);
+    if (numRecordsPerShard > 0) {
+      Assert.assertEquals(messages.size(), numShards);
+    } else {
+      // No input records and hence no messages
+      Assert.assertEquals(messages.size(), 0);
+      return;
+    }
+
+    Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = getProcessorMap(consumer);
+    ssps.forEach(ssp -> {
+        try {
+          KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);
+
+          if (numRecordsPerShard > 0) {
+            // Verify that the read messages are received in order and are the same as input records
+            Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
+            List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
+            List<Record> inputRecords = inputRecordMap.get(processor);
+            verifyRecords(envelopes, inputRecords, processor.getShardId());
+
+            // Call checkpoint on consumer and verify that the checkpoint is called with the right offset
+            IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
+            consumer.onCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
+            ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+            verify(getCheckpointer(processor)).checkpoint(argument.capture());
+            Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
+          }
+
+          // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
+          shutDownProcessor(processor, ShutdownReason.ZOMBIE);
+          Assert.assertTrue(!sspToProcessorMap.containsValue(processor));
+          Assert.assertTrue(isSspAvailable(consumer, ssp));
+        } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
+          throw new RuntimeException(ex);
+        }
+      });
+  }
+
+  private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) {
+    Map<String, KinesisRecordProcessor> processorMap = new HashMap<>();
+    IntStream.range(0, numShards)
+        .forEach(p -> {
+            String shardId = String.format("shard-%05d", p);
+            // Create Kinesis processor
+            KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor();
+
+            // Initialize the shard
+            ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000");
+            InitializationInput initializationInput =
+                new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum);
+            processor.initialize(initializationInput);
+            processorMap.put(shardId, processor);
+          });
+    return processorMap;
+  }
+
+  private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> readEvents(Set<SystemStreamPartition> ssps,
+      KinesisSystemConsumer consumer, int numEvents) throws InterruptedException {
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<>();
+    int totalEventsConsumed = 0;
+
+    while (totalEventsConsumed < numEvents) {
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> receivedMessages =
+          consumer.poll(ssps, Duration.ofSeconds(1).toMillis());
+      receivedMessages.forEach((key, value) -> {
+          if (messages.containsKey(key)) {
+            messages.get(key).addAll(value);
+          } else {
+            messages.put(key, new ArrayList<>(value));
+          }
+        });
+      totalEventsConsumed = messages.values().stream().mapToInt(List::size).sum();
+    }
+
+    if (totalEventsConsumed < numEvents) {
+      String msg = String.format("Received only %d of %d events", totalEventsConsumed, numEvents);
+      throw new SamzaException(msg);
+    }
+    return messages;
+  }
+
+  private void verifyRecords(List<IncomingMessageEnvelope> outputRecords, List<Record> inputRecords, String shardId) {
+    Iterator outputRecordsIter = outputRecords.iterator();
+    inputRecords.forEach(record -> {
+        IncomingMessageEnvelope envelope = (IncomingMessageEnvelope) outputRecordsIter.next();
+        String outputKey = (String) envelope.getKey();
+        KinesisIncomingMessageEnvelope kinesisMessageEnvelope = (KinesisIncomingMessageEnvelope) envelope;
+        Assert.assertEquals(outputKey, record.getPartitionKey());
+        Assert.assertEquals(kinesisMessageEnvelope.getSequenceNumber(), record.getSequenceNumber());
+        Assert.assertEquals(kinesisMessageEnvelope.getApproximateArrivalTimestamp(),
+            record.getApproximateArrivalTimestamp());
+        Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId);
+        ByteBuffer outputData = ByteBuffer.wrap((byte[]) kinesisMessageEnvelope.getMessage());
+        record.getData().rewind();
+        Assert.assertTrue(outputData.equals(record.getData()));
+        verifyOffset(envelope.getOffset(), record, shardId);
+      });
+  }
+
+  private void verifyOffset(String offset, Record inputRecord, String shardId) {
+    KinesisSystemConsumerOffset ckpt = KinesisSystemConsumerOffset.parse(offset);
+    Assert.assertEquals(ckpt.getSeqNumber(), inputRecord.getSequenceNumber());
+    Assert.assertEquals(ckpt.getShardId(), shardId);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void initializeMetrics(KinesisSystemConsumer consumer, String stream)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field f = consumer.getClass().getDeclaredField("metrics");
+    f.setAccessible(true);
+    KinesisSystemConsumerMetrics metrics = (KinesisSystemConsumerMetrics) f.get(consumer);
+    metrics.initializeMetrics(Collections.singleton(stream));
+  }
+
+  @SuppressWarnings("unchecked")
+  private Map<SystemStreamPartition, KinesisRecordProcessor> getProcessorMap(KinesisSystemConsumer consumer)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field f = consumer.getClass().getDeclaredField("processors");
+    f.setAccessible(true);
+    return (Map<SystemStreamPartition, KinesisRecordProcessor>) f.get(consumer);
+  }
+
+  @SuppressWarnings("unchecked")
+  private boolean isSspAvailable(KinesisSystemConsumer consumer, SystemStreamPartition ssp)
+      throws NoSuchFieldException, IllegalAccessException {
+    SSPAllocator sspAllocator = getSspAllocator(consumer);
+    Field f = sspAllocator.getClass().getDeclaredField("availableSsps");
+    f.setAccessible(true);
+    Map<String, Set<SystemStreamPartition>> availableSsps = (Map<String, Set<SystemStreamPartition>>) f.get(
+        sspAllocator);
+    return availableSsps.containsKey(ssp.getStream()) && availableSsps.get(ssp.getStream()).contains(ssp);
+  }
+
+  private SSPAllocator getSspAllocator(KinesisSystemConsumer consumer)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field f = consumer.getClass().getDeclaredField("sspAllocator");
+    f.setAccessible(true);
+    return (SSPAllocator) f.get(consumer);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java
new file mode 100644
index 0000000..615a06e
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumerOffset.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestKinesisSystemConsumerOffset {
+  @Test
+  public void testEquality() {
+    KinesisSystemConsumerOffset inCkpt = new KinesisSystemConsumerOffset("shard-00000", "123456");
+    KinesisSystemConsumerOffset outCkpt = KinesisSystemConsumerOffset.parse(inCkpt.toString());
+    Assert.assertEquals(inCkpt, outCkpt);
+  }
+
+  @Test
+  public void testInEquality() {
+    KinesisSystemConsumerOffset inCkpt = new KinesisSystemConsumerOffset("shard-00000", "123456");
+
+    // With different shardId
+    KinesisSystemConsumerOffset inCkpt1 = new KinesisSystemConsumerOffset("shard-00001", "123456");
+    KinesisSystemConsumerOffset outCkpt = KinesisSystemConsumerOffset.parse(inCkpt1.toString());
+    Assert.assertTrue(!inCkpt.equals(outCkpt));
+
+    // With different seqNumber
+    inCkpt1 = new KinesisSystemConsumerOffset("shard-00000", "123457");
+    outCkpt = KinesisSystemConsumerOffset.parse(inCkpt1.toString());
+    Assert.assertTrue(!inCkpt.equals(outCkpt));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java
new file mode 100644
index 0000000..0533a29
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestSSPAllocator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSSPAllocator {
+  @Test
+  public void testAllocateAndFree() throws NoAvailablePartitionException, NoSuchFieldException, IllegalAccessException {
+    int numPartitions = 2;
+    String system = "kinesis";
+    String stream = "stream";
+    List<SystemStreamPartition> ssps = new ArrayList<>();
+    IntStream.range(0, numPartitions)
+        .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i))));
+
+    SSPAllocator allocator = new SSPAllocator();
+    ssps.forEach(allocator::free);
+
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+
+    SystemStreamPartition ssp = allocator.allocate(stream);
+    Assert.assertFalse(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+    Assert.assertEquals(ssp, ssps.get(0));
+
+    ssp = allocator.allocate(stream);
+    Assert.assertFalse(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertFalse(isSspAvailable(allocator, ssps.get(1)));
+    Assert.assertEquals(ssp, ssps.get(1));
+
+    allocator.free(ssps.get(1));
+    Assert.assertFalse(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+
+    allocator.free(ssps.get(0));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(0)));
+    Assert.assertTrue(isSspAvailable(allocator, ssps.get(1)));
+  }
+
+  @Test (expected = NoAvailablePartitionException.class)
+  public void testAssignMoreThanMaxPartitions() throws NoAvailablePartitionException {
+    int numPartitions = 2;
+    String system = "kinesis";
+    String stream = "stream";
+    List<SystemStreamPartition> ssps = new ArrayList<>();
+    IntStream.range(0, numPartitions)
+        .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i))));
+
+    SSPAllocator allocator = new SSPAllocator();
+    ssps.forEach(allocator::free);
+
+    allocator.allocate(stream);
+    allocator.allocate(stream);
+    allocator.allocate(stream); // An exception should be thrown at this point.
+  }
+
+  @Test (expected = IllegalArgumentException.class)
+  public void testFreeSameSspTwice() throws NoAvailablePartitionException {
+    int numPartitions = 2;
+    String system = "kinesis";
+    String stream = "stream";
+    List<SystemStreamPartition> ssps = new ArrayList<>();
+    IntStream.range(0, numPartitions)
+        .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i))));
+
+    SSPAllocator allocator = new SSPAllocator();
+    ssps.forEach(allocator::free);
+
+    SystemStreamPartition ssp = allocator.allocate(stream);
+    allocator.free(ssp);
+    allocator.free(ssp); // An exception should be thrown at this point.
+  }
+
+  @Test (expected = IllegalArgumentException.class)
+  public void testFreeUnallocatedSsp() throws NoAvailablePartitionException {
+    int numPartitions = 2;
+    String system = "kinesis";
+    String stream = "stream";
+    List<SystemStreamPartition> ssps = new ArrayList<>();
+    IntStream.range(0, numPartitions)
+        .forEach(i -> ssps.add(new SystemStreamPartition(system, stream, new Partition(i))));
+
+    SSPAllocator allocator = new SSPAllocator();
+    ssps.forEach(allocator::free);
+
+    allocator.allocate(stream);
+    allocator.free(ssps.get(1)); // An exception should be thrown at this point.
+  }
+
+  @SuppressWarnings("unchecked")
+  private boolean isSspAvailable(SSPAllocator sspAllocator, SystemStreamPartition ssp) throws NoSuchFieldException, IllegalAccessException {
+    Field f = sspAllocator.getClass().getDeclaredField("availableSsps");
+    f.setAccessible(true);
+    Map<String, Set<SystemStreamPartition>> availableSsps = (Map<String, Set<SystemStreamPartition>>) f.get(
+        sspAllocator);
+    return availableSsps.containsKey(ssp.getStream()) && availableSsps.get(ssp.getStream()).contains(ssp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 0fe3dfa..e50e816 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -24,9 +24,8 @@ include \
   'samza-rest',
   'samza-shell',
   'samza-azure',
-  'samza-sql'
-
-
+  'samza-sql',
+  'samza-aws'
 
 def scalaModules = [
         'samza-core',


[2/2] samza git commit: SAMZA-1515; Implement a consumer for Kinesis

Posted by ja...@apache.org.
SAMZA-1515; Implement a consumer for Kinesis

Author: Aditya Toomula <at...@atoomula-ld1.linkedin.biz>

Reviewers: Jagadish<ja...@apache.org>

Closes #368 from atoomula/kinesis


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9961023f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9961023f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9961023f

Branch: refs/heads/master
Commit: 9961023f7bf7c4b19804fb4e50a14c86d6fc9233
Parents: 5e68d62
Author: Aditya Toomula <at...@atoomula-ld1.linkedin.biz>
Authored: Tue Nov 28 13:12:10 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 28 13:12:10 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |  33 ++
 .../kinesis/KinesisAWSCredentialsProvider.java  |  69 +++++
 .../samza/system/kinesis/KinesisConfig.java     | 287 ++++++++++++++++++
 .../system/kinesis/KinesisSystemAdmin.java      | 124 ++++++++
 .../system/kinesis/KinesisSystemFactory.java    |  87 ++++++
 .../KinesisIncomingMessageEnvelope.java         |  62 ++++
 .../consumer/KinesisRecordProcessor.java        | 208 +++++++++++++
 .../KinesisRecordProcessorListener.java         |  51 ++++
 .../kinesis/consumer/KinesisSystemConsumer.java | 256 ++++++++++++++++
 .../consumer/KinesisSystemConsumerOffset.java   | 107 +++++++
 .../consumer/NoAvailablePartitionException.java |  38 +++
 .../system/kinesis/consumer/SSPAllocator.java   |  73 +++++
 .../metrics/KinesisSystemConsumerMetrics.java   | 106 +++++++
 .../system/kinesis/metrics/SamzaHistogram.java  |  63 ++++
 .../TestKinesisAWSCredentialsProvider.java      |  60 ++++
 .../samza/system/kinesis/TestKinesisConfig.java | 132 ++++++++
 .../kinesis/TestKinesisSystemFactory.java       | 115 +++++++
 .../consumer/TestKinesisRecordProcessor.java    | 301 +++++++++++++++++++
 .../consumer/TestKinesisSystemConsumer.java     | 270 +++++++++++++++++
 .../TestKinesisSystemConsumerOffset.java        |  48 +++
 .../kinesis/consumer/TestSSPAllocator.java      | 127 ++++++++
 settings.gradle                                 |   5 +-
 22 files changed, 2619 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 59ff5f2..eddb11c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -220,6 +220,39 @@ project(':samza-azure') {
   }
 }
 
+project(':samza-aws') {
+  apply plugin: 'java'
+  apply plugin: 'checkstyle'
+
+  dependencies {
+    compile "com.amazonaws:aws-java-sdk-kinesis:1.11.152"
+    compile "com.amazonaws:amazon-kinesis-client:1.7.5"
+    compile "com.amazonaws:amazon-kinesis-producer:0.10.0"
+    compile "io.dropwizard.metrics:metrics-core:3.1.2"
+    compile "org.codehaus.jackson:jackson-core-asl:1.9.7"
+    compile "org.codehaus.jackson:jackson-mapper-asl:1.9.7"
+    compile project(':samza-api')
+    compile project(":samza-core_$scalaVersion")
+    compile "org.slf4j:slf4j-api:$slf4jVersion"
+    runtime "org.apache.httpcomponents:httpclient:4.5.2"
+    runtime "org.apache.httpcomponents:httpcore:4.4.5"
+    testCompile "junit:junit:$junitVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
+  }
+
+  repositories {
+    maven {
+      url "https://repo1.maven.org/maven2/"
+    }
+  }
+
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+    toolVersion = "$checkstyleVersion"
+  }
+}
+
+
 project(":samza-autoscaling_$scalaVersion") {
   apply plugin: 'scala'
   apply plugin: 'checkstyle'

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java
new file mode 100644
index 0000000..a37cfb4
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+
+
+/**
+ * AWSCredentialsProvider implementation that takes in accessKey and secretKey directly. Requires both accessKey and
+ * secretKey to be non-null for it to create a BasicAWSCredentials instance. Otherwise, it creates an AWSCredentials
+ * instance with null keys.
+ */
+public class KinesisAWSCredentialsProvider implements AWSCredentialsProvider {
+  private final AWSCredentials creds;
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisAWSCredentialsProvider.class.getName());
+
+  public KinesisAWSCredentialsProvider(String accessKey, String secretKey) {
+    if (StringUtils.isEmpty(accessKey) || StringUtils.isEmpty(secretKey)) {
+      creds = new AWSCredentials() {
+        @Override
+        public String getAWSAccessKeyId() {
+          return null;
+        }
+
+        @Override
+        public String getAWSSecretKey() {
+          return null;
+        }
+      };
+      LOG.info("Could not load credentials from KinesisAWSCredentialsProvider");
+    } else {
+      creds = new BasicAWSCredentials(accessKey, secretKey);
+      LOG.info("Loaded credentials from KinesisAWSCredentialsProvider");
+    }
+  }
+
+  @Override
+  public AWSCredentials getCredentials() {
+    return creds;
+  }
+
+  @Override
+  public void refresh() {
+    //no-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
new file mode 100644
index 0000000..a4ac40d
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSCredentialsProviderChain;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+import com.amazonaws.ClientConfiguration;
+
+
+/**
+ * Configs for Kinesis system. It contains three sets of configs:
+ * <ol>
+ *   <li> Configs required by Samza Kinesis Consumer.
+ *   <li> Configs that are AWS client specific provided at system scope {@link ClientConfiguration}
+ *   <li> Configs that are KCL specific (could be provided either at system scope or stream scope)
+ *        {@link KinesisClientLibConfiguration}
+ * </ol>
+ */
+public class KinesisConfig extends MapConfig {
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisConfig.class.getName());
+
+  private static final String CONFIG_SYSTEM_REGION = "systems.%s.aws.region";
+  private static final String CONFIG_STREAM_REGION = "systems.%s.streams.%s.aws.region";
+
+  private static final String CONFIG_STREAM_ACCESS_KEY = "systems.%s.streams.%s.aws.accessKey";
+  private static final String CONFIG_STREAM_SECRET_KEY = "sensitive.systems.%s.streams.%s.aws.secretKey";
+
+  private static final String CONFIG_AWS_CLIENT_CONFIG = "systems.%s.aws.clientConfig.";
+  private static final String CONFIG_PROXY_HOST = CONFIG_AWS_CLIENT_CONFIG + "ProxyHost";
+  private static final String DEFAULT_CONFIG_PROXY_HOST = "";
+  private static final String CONFIG_PROXY_PORT = CONFIG_AWS_CLIENT_CONFIG + "ProxyPort";
+  private static final int DEFAULT_CONFIG_PROXY_PORT = 0;
+
+  private static final String CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.aws.kcl.";
+  private static final String CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.streams.%s.aws.kcl.";
+
+  public KinesisConfig(Config config) {
+    super(config);
+  }
+
+  /**
+   * Return a set of streams from the config for a given system.
+   * @param system name of the system
+   * @return a set of streams
+   */
+  public Set<String> getKinesisStreams(String system) {
+    // build stream-level configs
+    Config streamsConfig = subset(String.format("systems.%s.streams.", system), true);
+    // all properties should now start with stream name
+    Set<String> streams = new HashSet<>();
+    streamsConfig.keySet().forEach(key -> {
+        String[] parts = key.split("\\.", 2);
+        if (parts.length != 2) {
+          throw new IllegalArgumentException("Ill-formatted stream config: " + key);
+        }
+        streams.add(parts[0]);
+      });
+    return streams;
+  }
+
+  /**
+   * Get KCL config for a given system stream.
+   * @param system name of the system
+   * @param stream name of the stream
+   * @param appName name of the application
+   * @return Stream scoped KCL configs required to build
+   *         {@link KinesisClientLibConfiguration}
+   */
+  public KinesisClientLibConfiguration getKinesisClientLibConfig(String system, String stream, String appName) {
+    ClientConfiguration clientConfig = getAWSClientConfig(system);
+    String workerId = appName + "-" + UUID.randomUUID();
+    InitialPositionInStream startPos = InitialPositionInStream.LATEST;
+    AWSCredentialsProvider provider = credentialsProviderForStream(system, stream);
+    KinesisClientLibConfiguration kinesisClientLibConfiguration =
+        new KinesisClientLibConfiguration(appName, stream, provider, workerId)
+            .withRegionName(getRegion(system, stream).getName())
+            .withKinesisClientConfig(clientConfig)
+            .withCloudWatchClientConfig(clientConfig)
+            .withDynamoDBClientConfig(clientConfig)
+            .withInitialPositionInStream(startPos)
+            .withCallProcessRecordsEvenForEmptyRecordList(true); // For health monitoring metrics.
+    // First, get system scoped configs for KCL and override with configs set at stream scope.
+    setKinesisClientLibConfigs(
+        subset(String.format(CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, system)), kinesisClientLibConfiguration);
+    setKinesisClientLibConfigs(subset(String.format(CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, system, stream)),
+        kinesisClientLibConfiguration);
+    return kinesisClientLibConfiguration;
+  }
+
+  /**
+   * Get the Kinesis secret key for the system stream
+   * @param system name of the system
+   * @param stream name of the stream
+   * @return Kinesis secret key
+   */
+  protected String getStreamSecretKey(String system, String stream) {
+    return get(String.format(CONFIG_STREAM_SECRET_KEY, system, stream));
+  }
+
+  /**
+   * Get SSL socket factory for the proxy for a given system
+   * @param system name of the system
+   * @return ConnectionSocketFactory
+   */
+  protected ConnectionSocketFactory getSSLSocketFactory(String system) {
+    return null;
+  }
+
+  /**
+   * @param system name of the system
+   * @return {@link ClientConfiguration} which has options controlling how the client connects to kinesis
+   *         (eg: proxy settings, retry counts, etc)
+   */
+  ClientConfiguration getAWSClientConfig(String system) {
+    ClientConfiguration awsClientConfig = new ClientConfiguration();
+    setAwsClientConfigs(subset(String.format(CONFIG_AWS_CLIENT_CONFIG, system)), awsClientConfig);
+    awsClientConfig.getApacheHttpClientConfig().setSslSocketFactory(getSSLSocketFactory(system));
+    return awsClientConfig;
+  }
+
+  /**
+   * Get the proxy host as a system level config. This is needed when
+   * users need to go through a proxy for the Kinesis connections.
+   * @param system name of the system
+   * @return proxy host name or empty string if not defined
+   */
+  String getProxyHost(String system) {
+    return get(String.format(CONFIG_PROXY_HOST, system), DEFAULT_CONFIG_PROXY_HOST);
+  }
+
+  /**
+   * Get the proxy port number as a system level config. This is needed when
+   * users need to go through a proxy for the Kinesis connections.
+   * @param system name of the system
+   * @return proxy port number or 0 if not defined
+   */
+  int getProxyPort(String system) {
+    return getInt(String.format(CONFIG_PROXY_PORT, system), DEFAULT_CONFIG_PROXY_PORT);
+  }
+
+  /**
+   * Get the Kinesis region for the system stream
+   * @param system name of the system
+   * @param stream name of the stream
+   * @return Kinesis region
+   */
+  Region getRegion(String system, String stream) {
+    String name = get(String.format(CONFIG_STREAM_REGION, system, stream),
+        get(String.format(CONFIG_SYSTEM_REGION, system)));
+    return Region.getRegion(Regions.fromName(name));
+  }
+
+  /**
+   * Get the Kinesis access key name for the system stream
+   * @param system name of the system
+   * @param stream name of the stream
+   * @return Kinesis access key
+   */
+  String getStreamAccessKey(String system, String stream) {
+    return get(String.format(CONFIG_STREAM_ACCESS_KEY, system, stream));
+  }
+
+  /**
+   * Get the appropriate CredentialProvider for a given system stream.
+   * @param system name of the system
+   * @param stream name of the stream
+   * @return AWSCredentialsProvider
+   */
+  AWSCredentialsProvider credentialsProviderForStream(String system, String stream) {
+    // Try to load credentials in the following order:
+    // 1. Access key from the config and passed in secretKey
+    // 2. From the default credential provider chain (environment variables, system properties, AWS profile file, etc)
+    return new AWSCredentialsProviderChain(
+        new KinesisAWSCredentialsProvider(getStreamAccessKey(system, stream), getStreamSecretKey(system, stream)),
+        new DefaultAWSCredentialsProviderChain());
+  }
+
+  private void setAwsClientConfigs(Config config, ClientConfiguration clientConfig) {
+    for (Entry<String, String> entry : config.entrySet()) {
+      boolean found = false;
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (StringUtils.isEmpty(value)) {
+        continue;
+      }
+      for (Method method : ClientConfiguration.class.getMethods()) {
+        // For each property invoke the corresponding setter, if it exists
+        if (method.getName().equals("set" + key)) {
+          found = true;
+          Class<?> type = method.getParameterTypes()[0];
+          try {
+            if (type == long.class) {
+              method.invoke(clientConfig, Long.valueOf(value));
+            } else if (type == int.class) {
+              method.invoke(clientConfig, Integer.valueOf(value));
+            } else if (type == boolean.class) {
+              method.invoke(clientConfig, Boolean.valueOf(value));
+            } else if (type == String.class) {
+              method.invoke(clientConfig, value);
+            }
+            LOG.info("Loaded property " + key + " = " + value);
+            break;
+          } catch (Exception e) {
+            throw new IllegalArgumentException(
+                String.format("Error trying to set field %s with the value '%s'", key, value), e);
+          }
+        }
+      }
+      if (!found) {
+        LOG.warn("Property " + key + " ignored as there is no corresponding set method");
+      }
+    }
+  }
+
+  private void setKinesisClientLibConfigs(Map<String, String> config, KinesisClientLibConfiguration kinesisLibConfig) {
+    for (Entry<String, String> entry : config.entrySet()) {
+      boolean found = false;
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (StringUtils.isEmpty(value)) {
+        continue;
+      }
+      for (Method method : KinesisClientLibConfiguration.class.getMethods()) {
+        if (method.getName().equals("with" + key)) {
+          found = true;
+          Class<?> type = method.getParameterTypes()[0];
+          try {
+            if (type == long.class) {
+              method.invoke(kinesisLibConfig, Long.valueOf(value));
+            } else if (type == int.class) {
+              method.invoke(kinesisLibConfig, Integer.valueOf(value));
+            } else if (type == boolean.class) {
+              method.invoke(kinesisLibConfig, Boolean.valueOf(value));
+            } else if (type == String.class) {
+              method.invoke(kinesisLibConfig, value);
+            } else if (type == InitialPositionInStream.class) {
+              method.invoke(kinesisLibConfig, InitialPositionInStream.valueOf(value.toUpperCase()));
+            }
+            LOG.info("Loaded property " + key + " = " + value);
+            break;
+          } catch (Exception e) {
+            throw new IllegalArgumentException(
+                String.format("Error trying to set field %s with the value '%s'", key, value), e);
+          }
+        }
+      }
+      if (!found) {
+        LOG.warn("Property " + key + " ignored as there is no corresponding set method");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java
new file mode 100644
index 0000000..4843276
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+
+
+/**
+ * A Kinesis-based implementation of SystemAdmin.
+ */
+public class KinesisSystemAdmin implements SystemAdmin {
+
+  private static final SystemStreamMetadata.SystemStreamPartitionMetadata SYSTEM_STREAM_PARTITION_METADATA =
+      new SystemStreamMetadata.SystemStreamPartitionMetadata(ExtendedSequenceNumber.TRIM_HORIZON.getSequenceNumber(),
+          ExtendedSequenceNumber.LATEST.getSequenceNumber(),
+          ExtendedSequenceNumber.LATEST.getSequenceNumber());
+
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisSystemAdmin.class.getName());
+
+  private final String system;
+  private final KinesisConfig kConfig;
+
+  public KinesisSystemAdmin(String system, KinesisConfig kConfig) {
+    this.system = system;
+    this.kConfig = kConfig;
+  }
+
+  /**
+   * Source of truth for checkpointing is always kinesis and the offsets written to samza checkpoint topic are ignored.
+   * Hence, return null for the getOffsetsAfter for a supplied map of ssps.
+   */
+  @Override
+  public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    Map<SystemStreamPartition, String> offsetsAfter = new HashMap<>();
+
+    for (SystemStreamPartition systemStreamPartition : offsets.keySet()) {
+      offsetsAfter.put(systemStreamPartition, null);
+    }
+
+    return offsetsAfter;
+  }
+
+  /**
+   * Source of truth for checkpointing is always kinesis and the offsets given by samza are always ignored by KCL.
+   * Hence, return a placeholder for each ssp.
+   */
+  @Override
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+    return streamNames.stream().collect(Collectors.toMap(Function.identity(), this::createSystemStreamMetadata));
+  }
+
+  private SystemStreamMetadata createSystemStreamMetadata(String stream) {
+    LOG.info("create stream metadata for stream {} based on aws stream", stream);
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadata = new HashMap<>();
+    AmazonKinesisClient client = null;
+
+    try {
+      ClientConfiguration clientConfig = kConfig.getAWSClientConfig(system);
+      AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
+          .withCredentials(kConfig.credentialsProviderForStream(system, stream))
+          .withClientConfiguration(clientConfig);
+      builder.setRegion(kConfig.getRegion(system, stream).getName());
+      client = (AmazonKinesisClient) builder.build();
+      StreamDescription desc = client.describeStream(stream).getStreamDescription();
+      IntStream.range(0, desc.getShards().size())
+          .forEach(i -> metadata.put(new Partition(i), SYSTEM_STREAM_PARTITION_METADATA));
+    } catch (Exception e) {
+      String errMsg = "couldn't load metadata for stream " + stream;
+      LOG.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
+    } finally {
+      if (client != null) {
+        client.shutdown();
+      }
+    }
+
+    return new SystemStreamMetadata(stream, metadata);
+  }
+
+  /**
+   * Checkpoints are written to KCL and is always the source of truth. Format for Samza offsets is different from
+   * that of Kinesis checkpoint. Samza offsets are not comparable. Hence, return null.
+   */
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
new file mode 100644
index 0000000..558e871
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+
+import org.apache.samza.system.kinesis.consumer.KinesisSystemConsumer;
+
+
+/**
+ * A Kinesis-based implementation of SystemFactory.
+ */
+public class KinesisSystemFactory implements SystemFactory {
+  @Override
+  public SystemConsumer getConsumer(String system, Config config, MetricsRegistry registry) {
+    KinesisConfig kConfig = new KinesisConfig(config);
+    return new KinesisSystemConsumer(system, kConfig, registry);
+  }
+
+  @Override
+  public SystemProducer getProducer(String system, Config config, MetricsRegistry registry) {
+    return null;
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String system, Config config) {
+    validateConfig(system, config);
+    KinesisConfig kConfig = new KinesisConfig(config);
+    return new KinesisSystemAdmin(system, kConfig);
+  }
+
+  protected void validateConfig(String system, Config config) {
+    // Kinesis system does not support groupers other than AllSspToSingleTaskGrouper
+    JobConfig jobConfig = new JobConfig(config);
+    if (!jobConfig.getSystemStreamPartitionGrouperFactory().equals(
+        AllSspToSingleTaskGrouperFactory.class.getCanonicalName())) {
+      String errMsg = String.format("Incorrect Grouper %s used for KinesisSystemConsumer %s. Please set the %s config"
+              + " to %s.", jobConfig.getSystemStreamPartitionGrouperFactory(), system,
+          JobConfig.SSP_GROUPER_FACTORY(), AllSspToSingleTaskGrouperFactory.class.getCanonicalName());
+      throw new ConfigException(errMsg);
+    }
+
+    // Kinesis streams cannot be configured as broadcast streams
+    TaskConfigJava taskConfig = new TaskConfigJava(config);
+    if (taskConfig.getBroadcastSystemStreams().stream().anyMatch(ss -> system.equals(ss.getSystem()))) {
+      throw new ConfigException("Kinesis streams cannot be configured as broadcast streams.");
+    }
+
+    // Kinesis streams cannot be configured as bootstrap streams
+    KinesisConfig kConfig = new KinesisConfig(config);
+    kConfig.getKinesisStreams(system).forEach(stream -> {
+        StreamConfig streamConfig = new StreamConfig(kConfig);
+        SystemStream ss = new SystemStream(system, stream);
+        if (streamConfig.getBootstrapEnabled(ss)) {
+          throw new ConfigException("Kinesis streams cannot be configured as bootstrap streams.");
+        }
+      });
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java
new file mode 100644
index 0000000..95e6b6a
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.Date;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Kinesis record with payload and some metadata.
+ */
+public class KinesisIncomingMessageEnvelope extends IncomingMessageEnvelope {
+  private final String shardId;
+  private final String sequenceNumber;
+  private final Date approximateArrivalTimestamp;
+
+  public KinesisIncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key,
+      Object message, String shardId, String sequenceNumber, Date approximateArrivalTimestamp) {
+    super(systemStreamPartition, offset, key, message);
+    this.shardId = shardId;
+    this.sequenceNumber = sequenceNumber;
+    this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+  }
+
+  public String getShardId() {
+    return shardId;
+  }
+
+  public String getSequenceNumber() {
+    return sequenceNumber;
+  }
+
+  public Date getApproximateArrivalTimestamp() {
+    return approximateArrivalTimestamp;
+  }
+
+  @Override
+  public String toString() {
+    return "KinesisIncomingMessageEnvelope:: shardId:" + shardId + ", sequenceNumber:" + sequenceNumber
+        + ", approximateArrivalTimestamp:" + approximateArrivalTimestamp + ", message:" + getMessage();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
new file mode 100644
index 0000000..53ff27f
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.List;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+import com.amazonaws.services.kinesis.model.Record;
+
+
+/**
+ * Record processor for AWS kinesis stream. It does the following:
+ * <ul>
+ *   <li> when a shard is assigned by KCL in initialize API, it asks and gets an ssp from sspAllocator.
+ *   <li> when records are received in processRecords API, it translates them to IncomingMessageEnvelope and enqueues
+ *        the resulting envelope in the appropriate blocking buffer queue.
+ *   <li> when checkpoint API is called by samza, it checkpoints via KCL to Kinesis.
+ *   <li> when shutdown API is called by KCL, based on the terminate reason, it takes necessary action.
+ * </ul>
+ *
+ * initialize, processRecords and shutdown APIs are never called concurrently on a processor instance. However,
+ * checkpoint API could be called by Samza thread while processRecords and shutdown callback APIs are invoked by KCL.
+ * Please note that the APIs for different record processor instances could be called concurrently.
+ */
+
+public class KinesisRecordProcessor implements IRecordProcessor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordProcessor.class.getName());
+  static final long POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS = 1000;
+
+  private final SystemStreamPartition ssp;
+
+  private String shardId;
+  private KinesisRecordProcessorListener listener;
+  private IRecordProcessorCheckpointer checkpointer;
+  private ExtendedSequenceNumber initSeqNumber;
+
+  private volatile ExtendedSequenceNumber lastProcessedRecordSeqNumber;
+  private volatile ExtendedSequenceNumber lastCheckpointedRecordSeqNumber;
+
+  private boolean shutdownRequested = false;
+
+  KinesisRecordProcessor(SystemStreamPartition ssp, KinesisRecordProcessorListener listener) {
+    this.ssp = ssp;
+    this.listener = listener;
+  }
+
+  /**
+   * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
+   * (via processRecords).
+   *
+   * @param initializationInput Provides information related to initialization
+   */
+  @Override
+  public void initialize(InitializationInput initializationInput) {
+    Validate.isTrue(listener != null, "There is no listener set for the processor.");
+    initSeqNumber = initializationInput.getExtendedSequenceNumber();
+    shardId = initializationInput.getShardId();
+    LOG.info("Initialization done for {} with sequence {}", this,
+        initializationInput.getExtendedSequenceNumber().getSequenceNumber());
+  }
+
+  /**
+   * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
+   * application. Upon fail over, the new instance will get records with sequence number greater than the checkpoint
+   * position for each partition key.
+   *
+   * @param processRecordsInput Provides the records to be processed as well as information and capabilities related
+   *        to them (eg checkpointing).
+   */
+  @Override
+  public void processRecords(ProcessRecordsInput processRecordsInput) {
+    // KCL does not send any records to the processor that was shutdown.
+    Validate.isTrue(!shutdownRequested,
+        String.format("KCL returned records after shutdown is called on the processor %s.", this));
+    // KCL aways gives reference to the same checkpointer instance for a given processor instance.
+    checkpointer = processRecordsInput.getCheckpointer();
+    List<Record> records = processRecordsInput.getRecords();
+    // Empty records are expected when KCL config has CallProcessRecordsEvenForEmptyRecordList set to true.
+    if (!records.isEmpty()) {
+      lastProcessedRecordSeqNumber = new ExtendedSequenceNumber(records.get(records.size() - 1).getSequenceNumber());
+      listener.onReceiveRecords(ssp, records, processRecordsInput.getMillisBehindLatest());
+    }
+  }
+
+  /**
+   * Invoked by the Samza thread to commit checkpoint for the shard owned by the record processor instance.
+   *
+   * @param seqNumber sequenceNumber to checkpoint for the shard owned by this processor instance.
+   */
+  public void checkpoint(String seqNumber) {
+    ExtendedSequenceNumber seqNumberToCheckpoint = new ExtendedSequenceNumber(seqNumber);
+    if (initSeqNumber.compareTo(seqNumberToCheckpoint) > 0) {
+      LOG.warn("Samza called checkpoint with seqNumber {} smaller than initial seqNumber {} for {}. Ignoring it!",
+          seqNumber, initSeqNumber, this);
+      return;
+    }
+
+    if (checkpointer == null) {
+      // checkpointer could be null as a result of shard re-assignment before the first record is processed.
+      LOG.warn("Ignoring checkpointing for {} with seqNumber {} because of re-assignment.", this, seqNumber);
+      return;
+    }
+
+    try {
+      checkpointer.checkpoint(seqNumber);
+      lastCheckpointedRecordSeqNumber = seqNumberToCheckpoint;
+    } catch (ShutdownException e) {
+      // This can happen as a result of shard re-assignment.
+      String msg = String.format("Checkpointing %s with seqNumber %s failed with exception. Dropping the checkpoint.",
+          this, seqNumber);
+      LOG.warn(msg, e);
+    } catch (InvalidStateException e) {
+      // This can happen when KCL encounters issues with internal state, eg: dynamoDB table is not found
+      String msg =
+          String.format("Checkpointing %s with seqNumber %s failed with exception.", this, seqNumber);
+      LOG.error(msg, e);
+      throw new SamzaException(msg, e);
+    } catch (ThrottlingException e) {
+      // Throttling is handled by KCL via the client lib configuration properties. If we get an exception inspite of
+      // throttling back-off behavior, let's throw an exception as the configs
+      String msg = String.format("Checkpointing %s with seqNumber %s failed with exception. Checkpoint interval is"
+              + " too aggressive for the provisioned throughput of the dynamoDB table where the checkpoints are stored."
+              + " Either reduce the checkpoint interval -or- increase the throughput of dynamoDB table.", this,
+          seqNumber);
+      throw new SamzaException(msg);
+    }
+  }
+
+  /**
+   * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
+   * RecordProcessor instance.
+   *
+   * @param shutdownInput Provides information and capabilities (eg checkpointing) related to shutdown of this record
+   *        processor.
+   */
+  @Override
+  public void shutdown(ShutdownInput shutdownInput) {
+    LOG.info("Shutting down {} with reason:{}", this, shutdownInput.getShutdownReason());
+
+    Validate.isTrue(!shutdownRequested, String.format("KCL called shutdown more than once for processor %s.", this));
+    shutdownRequested = true;
+    // shutdown reason TERMINATE indicates that the shard is closed due to re-sharding. It also indicates that all the
+    // records from the shard have been delivered to the consumer and the consumer is expected to checkpoint the
+    // progress.
+    if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
+      // We need to ensure that all records are processed and checkpointed before going ahead and marking the processing
+      // complete by calling checkpoint() on KCL. We need to checkpoint the completion state for parent shard, for KCL
+      // to consume from the child shard(s).
+      try {
+        LOG.info("Waiting for all the records for {} to be processed.", this);
+        // Let's poll periodically and block until the last processed record is checkpointed. Also handle the case
+        // where there are no records received for the processor, in which case the lastProcessedRecordSeqNumber will
+        // be null.
+        while (lastProcessedRecordSeqNumber != null
+            && !lastProcessedRecordSeqNumber.equals(lastCheckpointedRecordSeqNumber)) {
+          Thread.sleep(POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS);
+        }
+        LOG.info("Final checkpoint for {} before shutting down.", this);
+        shutdownInput.getCheckpointer().checkpoint();
+      } catch (Exception e) {
+        LOG.warn("An error occurred while committing the final checkpoint in the parent shard {}", this, e);
+      }
+    }
+    listener.onShutdown(ssp);
+  }
+
+  String getShardId() {
+    return shardId;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("KinesisRecordProcessor: ssp %s shard %s hashCode %s", ssp, shardId, hashCode());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java
new file mode 100644
index 0000000..72d86b9
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.List;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+import com.amazonaws.services.kinesis.model.Record;
+
+
+/**
+ * Listener interface implemented by consumer to be notified when {@link KinesisRecordProcessor} receives records and
+ * is ready to shutdown.
+ */
+public interface KinesisRecordProcessorListener {
+  /**
+   * Method invoked by
+   * {@link KinesisRecordProcessor#processRecords(com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput)}
+   * when the records are received by the processor.
+   * @param ssp Samza partition for which the records belong to
+   * @param records List of kinesis records
+   * @param millisBehindLatest Time lag of the batch of records with respect to the tip of the stream
+   */
+  void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest);
+
+  /**
+   * Method invoked by
+   * {@link KinesisRecordProcessor#shutdown(com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput)}
+   * when the processor is ready to shutdown.
+   * @param ssp Samza partition for which the shutdown is invoked
+   */
+  void onShutdown(SystemStreamPartition ssp);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
new file mode 100644
index 0000000..6afffd3
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointListener;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.kinesis.KinesisConfig;
+import org.apache.samza.system.kinesis.metrics.KinesisSystemConsumerMetrics;
+import org.apache.samza.util.BlockingEnvelopeMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
+import com.amazonaws.services.kinesis.model.Record;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+/**
+ * The system consumer for Kinesis, extending the {@link BlockingEnvelopeMap}.
+ *
+ * The system consumer creates a KinesisWorker per stream in it's own thread by providing a RecordProcessorFactory.
+ * Kinesis Client Library (KCL) uses this factory to instantiate a KinesisRecordProcessor for each shard in the Kinesis
+ * stream. KCL pushes data records to the appropriate record processor and the processor is responsible for processing
+ * the resulting records and place them into a blocking queue in {@link BlockingEnvelopeMap}.
+ *
+ * <pre>
+ *   {@code
+ *                                                                                Shard1  +----------------------+
+ *                                                                . --------------------> |KinesisRecordProcessor|
+ *                        Stream1                                 |               Shard2  +----------------------+
+ *                              +-------------+     +-----------------------------+       +----------------------+
+ *             .--------------->|    Worker   |---->|    RecordProcessorFactory   | ----> |KinesisRecordProcessor|
+ *             |                +-------------+     +-------------+---------------+       +----------------------+
+ *             |                                                  |               Shard3  +----------------------+
+ *             |                                                  . --------------------> |KinesisRecordProcessor|
+ *             |                                                                          +----------------------+
+ *             |          Stream2
+ *  +---------------------+     +-------------+     +-----------------------------+        +-------+
+ *  |KinesisSystemConsumer|---->|    Worker   |---->|    RecordProcessorFactory   |------->|  ...  |
+ *  +---------------------+     +-------------+     +-----------------------------+        +-------+
+ *             |
+ *             |
+ *             |
+ *             |
+ *             |                +-----------+
+ *             . -------------->|    ...    |
+ *                              +-----------+
+ *  }
+ *  </pre>
+ * Since KinesisSystemConsumer uses KCL, the checkpoint state is stored in a dynamoDB table which is maintained by KCL.
+ * KinesisSystemConsumer implements CheckpointListener to commit checkpoints via KCL.
+ */
+
+public class KinesisSystemConsumer extends BlockingEnvelopeMap implements CheckpointListener, KinesisRecordProcessorListener {
+
+  private static final int MAX_BLOCKING_QUEUE_SIZE = 100;
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisSystemConsumer.class.getName());
+
+  private final String system;
+  private final KinesisConfig kConfig;
+  private final KinesisSystemConsumerMetrics metrics;
+  private final SSPAllocator sspAllocator;
+
+  private final Set<String> streams = new HashSet<>();
+  private final Map<SystemStreamPartition, KinesisRecordProcessor> processors = new ConcurrentHashMap<>();
+  private final List<Worker> workers = new LinkedList<>();
+
+  private ExecutorService executorService;
+
+  private volatile Exception callbackException;
+
+  public KinesisSystemConsumer(String systemName, KinesisConfig kConfig, MetricsRegistry registry) {
+    super(registry, System::currentTimeMillis, null);
+    this.system = systemName;
+    this.kConfig = kConfig;
+    this.metrics = new KinesisSystemConsumerMetrics(registry);
+    this.sspAllocator = new SSPAllocator();
+  }
+
+  @Override
+  protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
+    return new LinkedBlockingQueue<>(MAX_BLOCKING_QUEUE_SIZE);
+  }
+
+  @Override
+  protected void put(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) {
+    try {
+      super.put(ssp, envelope);
+    } catch (Exception e) {
+      LOG.error("Exception while putting record. Shutting down SystemStream {}", ssp.getSystemStream(), e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public void register(SystemStreamPartition ssp, String offset) {
+    LOG.info("Register called with ssp {} and offset {}. Offset will be ignored.", ssp, offset);
+    String stream = ssp.getStream();
+    streams.add(stream);
+    sspAllocator.free(ssp);
+    super.register(ssp, offset);
+  }
+
+  @Override
+  public void start() {
+    LOG.info("Start samza consumer for system {}.", system);
+
+    metrics.initializeMetrics(streams);
+
+    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("kinesis-worker-thread-" + system + "-%d")
+        .build();
+    // launch kinesis workers in separate threads, one per stream
+    executorService = Executors.newFixedThreadPool(streams.size(), namedThreadFactory);
+
+    for (String stream : streams) {
+      // KCL Dynamodb table is used for storing the state of processing. By default, the table name is the same as the
+      // application name. Dynamodb table name must be unique for a given account and region (even across different
+      // streams). So, let's create the default one with the combination of job name, job id and stream name. The table
+      // name could be changed by providing a different TableName via KCL specific config.
+      String kinesisApplicationName =
+          kConfig.get(JobConfig.JOB_NAME()) + "-" + kConfig.get(JobConfig.JOB_ID()) + "-" + stream;
+
+      Worker worker = new Worker.Builder()
+          .recordProcessorFactory(createRecordProcessorFactory(stream))
+          .config(kConfig.getKinesisClientLibConfig(system, stream, kinesisApplicationName))
+          .build();
+
+      workers.add(worker);
+
+      // launch kinesis workers in separate thread-pools, one per stream
+      executorService.execute(worker);
+      LOG.info("Started worker for system {} stream {}.", system, stream);
+    }
+  }
+
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+      Set<SystemStreamPartition> ssps, long timeout) throws InterruptedException {
+    if (callbackException != null) {
+      throw new SamzaException(callbackException);
+    }
+    return super.poll(ssps, timeout);
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("Stop samza consumer for system {}.", system);
+    workers.forEach(Worker::shutdown);
+    workers.clear();
+    executorService.shutdownNow();
+    LOG.info("Kinesis system consumer executor service for system {} is shutdown.", system);
+  }
+
+  // package-private for tests
+  IRecordProcessorFactory createRecordProcessorFactory(String stream) {
+    return () -> {
+      // This code is executed in Kinesis thread context.
+      try {
+        SystemStreamPartition ssp = sspAllocator.allocate(stream);
+        KinesisRecordProcessor processor = new KinesisRecordProcessor(ssp, KinesisSystemConsumer.this);
+        KinesisRecordProcessor prevProcessor = processors.put(ssp, processor);
+        Validate.isTrue(prevProcessor == null, String.format("Adding new kinesis record processor %s while the"
+                + " previous processor %s for the same ssp %s is still active.", processor, prevProcessor, ssp));
+        return processor;
+      } catch (Exception e) {
+        callbackException = e;
+        // This exception is the result of kinesis dynamic shard splits due to which sspAllocator ran out of free ssps.
+        // Set the failed state in consumer which will eventually result in stopping the container. A manual job restart
+        // will be required at this point. After the job restart, the newly created shards will be discovered and enough
+        // ssps will be added to sspAllocator freePool.
+        throw new SamzaException(e);
+      }
+    };
+  }
+
+  @Override
+  public void onCheckpoint(Map<SystemStreamPartition, String> sspOffsets) {
+    LOG.info("onCheckpoint called with sspOffsets {}", sspOffsets);
+    sspOffsets.forEach((ssp, offset) -> {
+        KinesisRecordProcessor processor = processors.get(ssp);
+        KinesisSystemConsumerOffset kinesisOffset = KinesisSystemConsumerOffset.parse(offset);
+        if (processor == null) {
+          LOG.info("Kinesis Processor is not alive for ssp {}. This could be the result of rebalance. Hence dropping the"
+              + " checkpoint {}.", ssp, offset);
+        } else if (!kinesisOffset.getShardId().equals(processor.getShardId())) {
+          LOG.info("KinesisProcessor for ssp {} currently owns shard {} while the checkpoint is for shard {}. This could"
+              + " be the result of rebalance. Hence dropping the checkpoint {}.", ssp, processor.getShardId(),
+              kinesisOffset.getShardId(), offset);
+        } else {
+          processor.checkpoint(kinesisOffset.getSeqNumber());
+        }
+      });
+  }
+
+  @Override
+  public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) {
+    metrics.updateMillisBehindLatest(ssp.getStream(), millisBehindLatest);
+    records.forEach(record -> put(ssp, translate(ssp, record)));
+  }
+
+  @Override
+  public void onShutdown(SystemStreamPartition ssp) {
+    processors.remove(ssp);
+    sspAllocator.free(ssp);
+  }
+
+  private IncomingMessageEnvelope translate(SystemStreamPartition ssp, Record record) {
+    String shardId = processors.get(ssp).getShardId();
+    byte[] payload = new byte[record.getData().remaining()];
+
+    metrics.updateMetrics(ssp.getStream(), record);
+    record.getData().get(payload);
+    KinesisSystemConsumerOffset offset = new KinesisSystemConsumerOffset(shardId, record.getSequenceNumber());
+    return new KinesisIncomingMessageEnvelope(ssp, offset.toString(), record.getPartitionKey(),
+        payload, shardId, record.getSequenceNumber(), record.getApproximateArrivalTimestamp());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
new file mode 100644
index 0000000..13296ca
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Kinesis system consumer related checkpoint information that is stored in the IncomingMessageEnvelope offset.
+ *
+ * It contains the following metadata:
+ * <ul>
+ *   <li> shardId: Kinesis stream shardId.
+ *   <li> seqNumber: sequence number in the above shard.
+ * </ul>
+ *
+ * Please note that the source of truth for checkpointing is the AWS dynamoDB table corresponding to the application.
+ * The offset that is stored in Samza checkpoint topic is not used.
+ */
+public class KinesisSystemConsumerOffset {
+
+  @JsonProperty("shardId")
+  private String shardId;
+  @JsonProperty("seqNumber")
+  private String seqNumber;
+
+  @JsonCreator
+  KinesisSystemConsumerOffset(@JsonProperty("shardId") String shardId,
+      @JsonProperty("seqNumber") String seqNumber) {
+    this.shardId = shardId;
+    this.seqNumber = seqNumber;
+  }
+
+  String getShardId() {
+    return shardId;
+  }
+
+  String getSeqNumber() {
+    return seqNumber;
+  }
+
+  static KinesisSystemConsumerOffset parse(String metadata) {
+    JsonSerdeV2<KinesisSystemConsumerOffset> jsonSerde = new JsonSerdeV2<>(KinesisSystemConsumerOffset.class);
+    byte[] bytes;
+    try {
+      bytes = metadata.getBytes("UTF-8");
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+    return jsonSerde.fromBytes(bytes);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public String toString() {
+    JsonSerdeV2<KinesisSystemConsumerOffset> jsonSerde = new JsonSerdeV2<>(KinesisSystemConsumerOffset.class);
+    return new String(jsonSerde.toBytes(this));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == this) {
+      return true;
+    }
+    if (!(o instanceof KinesisSystemConsumerOffset)) {
+      return false;
+    }
+
+    String thatShardId = ((KinesisSystemConsumerOffset) o).getShardId();
+    if (!(shardId == null ? thatShardId == null : shardId.equals(thatShardId))) {
+      return false;
+    }
+    String thatSeqNumber = ((KinesisSystemConsumerOffset) o).getSeqNumber();
+    if (!(seqNumber == null ? thatSeqNumber == null : seqNumber.equals(thatSeqNumber))) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = shardId.hashCode();
+    result = 31 * result + seqNumber.hashCode();
+    return result;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java
new file mode 100644
index 0000000..6caf760
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+
+/**
+ * SSPAllocator is unable to allocate an SSP
+ */
+public class NoAvailablePartitionException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  public NoAvailablePartitionException(String message) {
+    super(message);
+  }
+
+  public NoAvailablePartitionException(String message, Exception e) {
+    super(message, e);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
new file mode 100644
index 0000000..4b7cff8
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SSPAllocator is responsible for assigning Samza SystemStreamPartitions (SSPs). It provides two APIs:
+ * <ul>
+ *   <li> allocate: Given a stream, returns free ssp.
+ *   <li> free: Adds ssp back to the free pool.
+ * </ul>
+ * A free (unallocated) ssp is returned for every allocate request and when there is no available ssp to allocate,
+ * the allocator throws NoAvailablePartitionException. Allocator could run out of free ssps as a result of dynamic
+ * shard splits.
+ */
+class SSPAllocator {
+  private static final Logger LOG = LoggerFactory.getLogger(SSPAllocator.class.getName());
+
+  private final Map<String, Set<SystemStreamPartition>> availableSsps = new HashMap<>();
+
+  synchronized SystemStreamPartition allocate(String stream) throws NoAvailablePartitionException {
+    Validate.isTrue(availableSsps.get(stream) != null,
+        String.format("availableSsps is null for stream %s", stream));
+
+    if (availableSsps.get(stream).size() <= 0) {
+      // Set a flag in system consumer so that it could throw an exception in the subsequent poll.
+      throw new NoAvailablePartitionException(String.format("More shards detected for stream %s than initially"
+          + " registered. Could be the result of dynamic resharding.", stream));
+    }
+
+    SystemStreamPartition ssp = availableSsps.get(stream).iterator().next();
+    availableSsps.get(stream).remove(ssp);
+
+    LOG.info("Number of unassigned partitions for system-stream {} is {}.", ssp.getSystemStream(),
+        availableSsps.get(ssp.getStream()).size());
+    return ssp;
+  }
+
+  synchronized void free(SystemStreamPartition ssp) {
+    boolean success = availableSsps.computeIfAbsent(ssp.getStream(), p -> new HashSet<>()).add(ssp);
+    Validate.isTrue(success, String.format("Ssp %s is already in free pool.", ssp));
+
+    LOG.info("Number of unassigned partitions for system-stream {} is {}.", ssp.getSystemStream(),
+        availableSsps.get(ssp.getStream()).size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
new file mode 100644
index 0000000..2f42981
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.metrics;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+
+import com.amazonaws.services.kinesis.model.Record;
+
+
+/**
+ * KinesisSystemConsumerMetrics class has per-stream metrics and aggregate metrics across kinesis consumers
+ */
+
+public class KinesisSystemConsumerMetrics {
+
+  private final MetricsRegistry registry;
+
+  // Aggregate metrics across all kinesis system consumers
+  private static Counter aggEventReadRate = null;
+  private static Counter aggEventByteReadRate = null;
+  private static SamzaHistogram aggReadLatency = null;
+  private static SamzaHistogram aggMillisBehindLatest = null;
+
+  // Per-stream metrics
+  private Map<String, Counter> eventReadRates;
+  private Map<String, Counter> eventByteReadRates;
+  private Map<String, SamzaHistogram> readLatencies;
+  private Map<String, SamzaHistogram> millisBehindLatest;
+
+  private static final Object LOCK = new Object();
+
+  private static final String AGGREGATE = "aggregate";
+  private static final String EVENT_READ_RATE = "eventReadRate";
+  private static final String EVENT_BYTE_READ_RATE = "eventByteReadRate";
+  private static final String READ_LATENCY = "readLatency";
+  private static final String MILLIS_BEHIND_LATEST = "millisBehindLatest";
+
+  public KinesisSystemConsumerMetrics(MetricsRegistry registry) {
+    this.registry = registry;
+  }
+
+  public void initializeMetrics(Set<String> streamNames) {
+    eventReadRates = streamNames.stream()
+        .collect(Collectors.toConcurrentMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE)));
+    eventByteReadRates = streamNames.stream()
+        .collect(Collectors.toConcurrentMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE)));
+    readLatencies = streamNames.stream()
+        .collect(Collectors.toConcurrentMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY)));
+    millisBehindLatest = streamNames.stream()
+        .collect(Collectors.toConcurrentMap(Function.identity(),
+            x -> new SamzaHistogram(registry, x, MILLIS_BEHIND_LATEST)));
+
+    // Locking to ensure that these aggregated metrics will be created only once across multiple system consumers.
+    synchronized (LOCK) {
+      if (aggEventReadRate == null) {
+        aggEventReadRate = registry.newCounter(AGGREGATE, EVENT_READ_RATE);
+        aggEventByteReadRate = registry.newCounter(AGGREGATE, EVENT_BYTE_READ_RATE);
+        aggReadLatency = new SamzaHistogram(registry, AGGREGATE, READ_LATENCY);
+        aggMillisBehindLatest = new SamzaHistogram(registry, AGGREGATE, MILLIS_BEHIND_LATEST);
+      }
+    }
+  }
+
+  public void updateMillisBehindLatest(String stream, Long millisBehindLatest) {
+    this.millisBehindLatest.get(stream).update(millisBehindLatest);
+    aggMillisBehindLatest.update(millisBehindLatest);
+  }
+
+  public void updateMetrics(String stream, Record record) {
+    eventReadRates.get(stream).inc();
+    aggEventReadRate.inc();
+
+    long recordSize = record.getData().array().length + record.getPartitionKey().length();
+    eventByteReadRates.get(stream).inc(recordSize);
+    aggEventByteReadRate.inc(recordSize);
+
+    long latencyMs = Duration.between(Instant.now(), record.getApproximateArrivalTimestamp().toInstant()).toMillis();
+    readLatencies.get(stream).update(latencyMs);
+    aggReadLatency.update(latencyMs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java
new file mode 100644
index 0000000..29964dc
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.metrics;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+
+
+class SamzaHistogram {
+
+  private static final List<Double> DEFAULT_HISTOGRAM_PERCENTILES = Arrays.asList(50D, 99D);
+  private final MetricsRegistry registry;
+  private final Histogram histogram;
+  private final List<Double> percentiles;
+  private final Map<Double, Gauge<Double>> gauges;
+
+  SamzaHistogram(MetricsRegistry registry, String group, String name) {
+    this(registry, group, name, DEFAULT_HISTOGRAM_PERCENTILES);
+  }
+
+  SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) {
+    this.registry = registry;
+    this.histogram = new Histogram(new ExponentiallyDecayingReservoir());
+    this.percentiles = percentiles;
+    this.gauges = percentiles.stream()
+        .filter(x -> x > 0 && x <= 100)
+        .collect(
+            Collectors.toMap(Function.identity(), x -> this.registry.newGauge(group, name + "_" + String.valueOf(0), 0D)));
+  }
+
+  synchronized void update(long value) {
+    histogram.update(value);
+    Snapshot values = histogram.getSnapshot();
+    percentiles.forEach(x -> gauges.get(x).set(values.getValue(x / 100)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java
new file mode 100644
index 0000000..93887ed
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestKinesisAWSCredentialsProvider {
+
+  @Test
+  public void testCredentialsProviderWithNonNullKeys() {
+    String accessKey = "accessKey";
+    String secretKey = "secretKey";
+    KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(accessKey, secretKey);
+    assertEquals(credProvider.getCredentials().getAWSAccessKeyId(), accessKey);
+    assertEquals(credProvider.getCredentials().getAWSSecretKey(), secretKey);
+  }
+
+  @Test
+  public void testCredentialsProviderWithNullAccessKey() {
+    String secretKey = "secretKey";
+    KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(null, secretKey);
+    assertNull(credProvider.getCredentials().getAWSAccessKeyId());
+    assertNull(credProvider.getCredentials().getAWSSecretKey());
+  }
+
+  @Test
+  public void testCredentialsProviderWithNullSecretKey() {
+    String accessKey = "accessKey";
+    KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(accessKey, null);
+    assertNull(credProvider.getCredentials().getAWSAccessKeyId());
+    assertNull(credProvider.getCredentials().getAWSSecretKey());
+  }
+
+  @Test
+  public void testCredentialsProviderWithNullKeys() {
+    KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(null, null);
+    assertNull(credProvider.getCredentials().getAWSAccessKeyId());
+    assertNull(credProvider.getCredentials().getAWSSecretKey());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java
new file mode 100644
index 0000000..56e4810
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.junit.Test;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+
+import static org.junit.Assert.*;
+
+
+public class TestKinesisConfig {
+  @Test
+  public void testGetKinesisStreams() {
+    Map<String, String> kv = new HashMap<>();
+    kv.put("systems.kinesis.streams.kinesis-stream1.prop1", "value1");
+    kv.put("systems.kinesis.streams.kinesis-stream1.prop2", "value2");
+    kv.put("systems.kinesis.streams.kinesis-stream2.prop1", "value3");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+
+    Set<String> streams = kConfig.getKinesisStreams("kinesis");
+    assertEquals(2, streams.size());
+  }
+
+  @Test
+  public void testKinesisConfigs() {
+    Map<String, String> kv = new HashMap<>();
+    String system = "kinesis";
+    String stream = "kinesis-stream";
+    String systemConfigPrefix = String.format("systems.%s.", system);
+    String ssConfigPrefix = String.format("systems.%s.streams.%s.", system, stream);
+
+    kv.put("sensitive." + ssConfigPrefix + "aws.secretKey", "secretKey");
+    kv.put(systemConfigPrefix + "aws.region", "us-east-1");
+    kv.put(ssConfigPrefix + "aws.accessKey", "accessKey");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+
+    assertEquals("us-east-1", kConfig.getRegion(system, stream).getName());
+    assertEquals("accessKey", kConfig.getStreamAccessKey(system, stream));
+    assertEquals("secretKey", kConfig.getStreamSecretKey(system, stream));
+  }
+
+  @Test
+  public void testAwsClientConfigs() {
+    Map<String, String> kv = new HashMap<>();
+    String system = "kinesis";
+    String systemConfigPrefix = String.format("systems.%s.", system);
+
+    // Aws Client Configs
+    kv.put(systemConfigPrefix + "aws.clientConfig.ProxyHost", "hostName");
+    kv.put(systemConfigPrefix + "aws.clientConfig.ProxyPort", "8080");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+
+    assertEquals("hostName", kConfig.getAWSClientConfig(system).getProxyHost());
+    assertEquals(8080, kConfig.getAWSClientConfig(system).getProxyPort());
+  }
+
+  @Test
+  public void testKclConfigs() {
+    Map<String, String> kv = new HashMap<>();
+    String system = "kinesis";
+    String stream = "kinesis-stream";
+    String systemConfigPrefix = String.format("systems.%s.", system);
+
+    // region config is required for setting kcl config.
+    kv.put(systemConfigPrefix + "aws.region", "us-east-1");
+
+    // Kcl Configs
+    kv.put(systemConfigPrefix + "aws.kcl.TableName", "sample-table");
+    kv.put(systemConfigPrefix + "aws.kcl.MaxRecords", "100");
+    kv.put(systemConfigPrefix + "aws.kcl.CallProcessRecordsEvenForEmptyRecordList", "true");
+    kv.put(systemConfigPrefix + "aws.kcl.InitialPositionInStream", "TRIM_HORIZON");
+    // override one of the Kcl configs for kinesis-stream1
+    kv.put(systemConfigPrefix + "streams.kinesis-stream1.aws.kcl.InitialPositionInStream", "LATEST");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+    KinesisClientLibConfiguration kclConfig = kConfig.getKinesisClientLibConfig(system, stream, "sample-app");
+
+    assertEquals("sample-table", kclConfig.getTableName());
+    assertEquals(100, kclConfig.getMaxRecords());
+    assertTrue(kclConfig.shouldCallProcessRecordsEvenForEmptyRecordList());
+    assertEquals(InitialPositionInStream.TRIM_HORIZON, kclConfig.getInitialPositionInStream());
+
+    // verify if the overriden config is applied for kinesis-stream1
+    kclConfig = kConfig.getKinesisClientLibConfig(system, "kinesis-stream1", "sample-app");
+    assertEquals(InitialPositionInStream.LATEST, kclConfig.getInitialPositionInStream());
+  }
+
+  @Test
+  public void testgetKCLConfigWithUnknownConfigs() {
+    Map<String, String> kv = new HashMap<>();
+    kv.put("systems.kinesis.aws.region", "us-east-1");
+    kv.put("systems.kinesis.streams.kinesis-stream.aws.kcl.random", "value");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+
+    // Should not throw any exception and just ignore the unknown configs.
+    kConfig.getKinesisClientLibConfig("kinesis", "kinesis-stream", "sample-app");
+  }
+}