You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/11/29 19:36:43 UTC

[4/8] samza git commit: SAMZA-1515; Implement a consumer for Kinesis

SAMZA-1515; Implement a consumer for Kinesis

Author: Aditya Toomula <at...@atoomula-ld1.linkedin.biz>

Reviewers: Jagadish<ja...@apache.org>

Closes #368 from atoomula/kinesis


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9961023f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9961023f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9961023f

Branch: refs/heads/0.14.0
Commit: 9961023f7bf7c4b19804fb4e50a14c86d6fc9233
Parents: 5e68d62
Author: Aditya Toomula <at...@atoomula-ld1.linkedin.biz>
Authored: Tue Nov 28 13:12:10 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 28 13:12:10 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |  33 ++
 .../kinesis/KinesisAWSCredentialsProvider.java  |  69 +++++
 .../samza/system/kinesis/KinesisConfig.java     | 287 ++++++++++++++++++
 .../system/kinesis/KinesisSystemAdmin.java      | 124 ++++++++
 .../system/kinesis/KinesisSystemFactory.java    |  87 ++++++
 .../KinesisIncomingMessageEnvelope.java         |  62 ++++
 .../consumer/KinesisRecordProcessor.java        | 208 +++++++++++++
 .../KinesisRecordProcessorListener.java         |  51 ++++
 .../kinesis/consumer/KinesisSystemConsumer.java | 256 ++++++++++++++++
 .../consumer/KinesisSystemConsumerOffset.java   | 107 +++++++
 .../consumer/NoAvailablePartitionException.java |  38 +++
 .../system/kinesis/consumer/SSPAllocator.java   |  73 +++++
 .../metrics/KinesisSystemConsumerMetrics.java   | 106 +++++++
 .../system/kinesis/metrics/SamzaHistogram.java  |  63 ++++
 .../TestKinesisAWSCredentialsProvider.java      |  60 ++++
 .../samza/system/kinesis/TestKinesisConfig.java | 132 ++++++++
 .../kinesis/TestKinesisSystemFactory.java       | 115 +++++++
 .../consumer/TestKinesisRecordProcessor.java    | 301 +++++++++++++++++++
 .../consumer/TestKinesisSystemConsumer.java     | 270 +++++++++++++++++
 .../TestKinesisSystemConsumerOffset.java        |  48 +++
 .../kinesis/consumer/TestSSPAllocator.java      | 127 ++++++++
 settings.gradle                                 |   5 +-
 22 files changed, 2619 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 59ff5f2..eddb11c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -220,6 +220,39 @@ project(':samza-azure') {
   }
 }
 
+project(':samza-aws') {
+  apply plugin: 'java'
+  apply plugin: 'checkstyle'
+
+  dependencies {
+    compile "com.amazonaws:aws-java-sdk-kinesis:1.11.152"
+    compile "com.amazonaws:amazon-kinesis-client:1.7.5"
+    compile "com.amazonaws:amazon-kinesis-producer:0.10.0"
+    compile "io.dropwizard.metrics:metrics-core:3.1.2"
+    compile "org.codehaus.jackson:jackson-core-asl:1.9.7"
+    compile "org.codehaus.jackson:jackson-mapper-asl:1.9.7"
+    compile project(':samza-api')
+    compile project(":samza-core_$scalaVersion")
+    compile "org.slf4j:slf4j-api:$slf4jVersion"
+    runtime "org.apache.httpcomponents:httpclient:4.5.2"
+    runtime "org.apache.httpcomponents:httpcore:4.4.5"
+    testCompile "junit:junit:$junitVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
+  }
+
+  repositories {
+    maven {
+      url "https://repo1.maven.org/maven2/"
+    }
+  }
+
+  checkstyle {
+    configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+    toolVersion = "$checkstyleVersion"
+  }
+}
+
+
 project(":samza-autoscaling_$scalaVersion") {
   apply plugin: 'scala'
   apply plugin: 'checkstyle'

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java
new file mode 100644
index 0000000..a37cfb4
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisAWSCredentialsProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+
+
+/**
+ * AWSCredentialsProvider implementation that takes in accessKey and secretKey directly. Requires both accessKey and
+ * secretKey to be non-null for it to create a BasicAWSCredentials instance. Otherwise, it creates an AWSCredentials
+ * instance with null keys.
+ */
+public class KinesisAWSCredentialsProvider implements AWSCredentialsProvider {
+  private final AWSCredentials creds;
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisAWSCredentialsProvider.class.getName());
+
+  public KinesisAWSCredentialsProvider(String accessKey, String secretKey) {
+    if (StringUtils.isEmpty(accessKey) || StringUtils.isEmpty(secretKey)) {
+      creds = new AWSCredentials() {
+        @Override
+        public String getAWSAccessKeyId() {
+          return null;
+        }
+
+        @Override
+        public String getAWSSecretKey() {
+          return null;
+        }
+      };
+      LOG.info("Could not load credentials from KinesisAWSCredentialsProvider");
+    } else {
+      creds = new BasicAWSCredentials(accessKey, secretKey);
+      LOG.info("Loaded credentials from KinesisAWSCredentialsProvider");
+    }
+  }
+
+  @Override
+  public AWSCredentials getCredentials() {
+    return creds;
+  }
+
+  @Override
+  public void refresh() {
+    //no-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
new file mode 100644
index 0000000..a4ac40d
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSCredentialsProviderChain;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+import com.amazonaws.ClientConfiguration;
+
+
+/**
+ * Configs for Kinesis system. It contains three sets of configs:
+ * <ol>
+ *   <li> Configs required by Samza Kinesis Consumer.
+ *   <li> Configs that are AWS client specific provided at system scope {@link ClientConfiguration}
+ *   <li> Configs that are KCL specific (could be provided either at system scope or stream scope)
+ *        {@link KinesisClientLibConfiguration}
+ * </ol>
+ */
+public class KinesisConfig extends MapConfig {
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisConfig.class.getName());
+
+  private static final String CONFIG_SYSTEM_REGION = "systems.%s.aws.region";
+  private static final String CONFIG_STREAM_REGION = "systems.%s.streams.%s.aws.region";
+
+  private static final String CONFIG_STREAM_ACCESS_KEY = "systems.%s.streams.%s.aws.accessKey";
+  private static final String CONFIG_STREAM_SECRET_KEY = "sensitive.systems.%s.streams.%s.aws.secretKey";
+
+  private static final String CONFIG_AWS_CLIENT_CONFIG = "systems.%s.aws.clientConfig.";
+  private static final String CONFIG_PROXY_HOST = CONFIG_AWS_CLIENT_CONFIG + "ProxyHost";
+  private static final String DEFAULT_CONFIG_PROXY_HOST = "";
+  private static final String CONFIG_PROXY_PORT = CONFIG_AWS_CLIENT_CONFIG + "ProxyPort";
+  private static final int DEFAULT_CONFIG_PROXY_PORT = 0;
+
+  private static final String CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.aws.kcl.";
+  private static final String CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.streams.%s.aws.kcl.";
+
+  public KinesisConfig(Config config) {
+    super(config);
+  }
+
+  /**
+   * Return a set of streams from the config for a given system.
+   * @param system name of the system
+   * @return a set of streams
+   */
+  public Set<String> getKinesisStreams(String system) {
+    // build stream-level configs
+    Config streamsConfig = subset(String.format("systems.%s.streams.", system), true);
+    // all properties should now start with stream name
+    Set<String> streams = new HashSet<>();
+    streamsConfig.keySet().forEach(key -> {
+        String[] parts = key.split("\\.", 2);
+        if (parts.length != 2) {
+          throw new IllegalArgumentException("Ill-formatted stream config: " + key);
+        }
+        streams.add(parts[0]);
+      });
+    return streams;
+  }
+
+  /**
+   * Get KCL config for a given system stream.
+   * @param system name of the system
+   * @param stream name of the stream
+   * @param appName name of the application
+   * @return Stream scoped KCL configs required to build
+   *         {@link KinesisClientLibConfiguration}
+   */
+  public KinesisClientLibConfiguration getKinesisClientLibConfig(String system, String stream, String appName) {
+    ClientConfiguration clientConfig = getAWSClientConfig(system);
+    String workerId = appName + "-" + UUID.randomUUID();
+    InitialPositionInStream startPos = InitialPositionInStream.LATEST;
+    AWSCredentialsProvider provider = credentialsProviderForStream(system, stream);
+    KinesisClientLibConfiguration kinesisClientLibConfiguration =
+        new KinesisClientLibConfiguration(appName, stream, provider, workerId)
+            .withRegionName(getRegion(system, stream).getName())
+            .withKinesisClientConfig(clientConfig)
+            .withCloudWatchClientConfig(clientConfig)
+            .withDynamoDBClientConfig(clientConfig)
+            .withInitialPositionInStream(startPos)
+            .withCallProcessRecordsEvenForEmptyRecordList(true); // For health monitoring metrics.
+    // First, get system scoped configs for KCL and override with configs set at stream scope.
+    setKinesisClientLibConfigs(
+        subset(String.format(CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, system)), kinesisClientLibConfiguration);
+    setKinesisClientLibConfigs(subset(String.format(CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, system, stream)),
+        kinesisClientLibConfiguration);
+    return kinesisClientLibConfiguration;
+  }
+
+  /**
+   * Get the Kinesis secret key for the system stream
+   * @param system name of the system
+   * @param stream name of the stream
+   * @return Kinesis secret key
+   */
+  protected String getStreamSecretKey(String system, String stream) {
+    return get(String.format(CONFIG_STREAM_SECRET_KEY, system, stream));
+  }
+
+  /**
+   * Get SSL socket factory for the proxy for a given system
+   * @param system name of the system
+   * @return ConnectionSocketFactory
+   */
+  protected ConnectionSocketFactory getSSLSocketFactory(String system) {
+    return null;
+  }
+
+  /**
+   * @param system name of the system
+   * @return {@link ClientConfiguration} which has options controlling how the client connects to kinesis
+   *         (eg: proxy settings, retry counts, etc)
+   */
+  ClientConfiguration getAWSClientConfig(String system) {
+    ClientConfiguration awsClientConfig = new ClientConfiguration();
+    setAwsClientConfigs(subset(String.format(CONFIG_AWS_CLIENT_CONFIG, system)), awsClientConfig);
+    awsClientConfig.getApacheHttpClientConfig().setSslSocketFactory(getSSLSocketFactory(system));
+    return awsClientConfig;
+  }
+
+  /**
+   * Get the proxy host as a system level config. This is needed when
+   * users need to go through a proxy for the Kinesis connections.
+   * @param system name of the system
+   * @return proxy host name or empty string if not defined
+   */
+  String getProxyHost(String system) {
+    return get(String.format(CONFIG_PROXY_HOST, system), DEFAULT_CONFIG_PROXY_HOST);
+  }
+
+  /**
+   * Get the proxy port number as a system level config. This is needed when
+   * users need to go through a proxy for the Kinesis connections.
+   * @param system name of the system
+   * @return proxy port number or 0 if not defined
+   */
+  int getProxyPort(String system) {
+    return getInt(String.format(CONFIG_PROXY_PORT, system), DEFAULT_CONFIG_PROXY_PORT);
+  }
+
+  /**
+   * Get the Kinesis region for the system stream
+   * @param system name of the system
+   * @param stream name of the stream
+   * @return Kinesis region
+   */
+  Region getRegion(String system, String stream) {
+    String name = get(String.format(CONFIG_STREAM_REGION, system, stream),
+        get(String.format(CONFIG_SYSTEM_REGION, system)));
+    return Region.getRegion(Regions.fromName(name));
+  }
+
+  /**
+   * Get the Kinesis access key name for the system stream
+   * @param system name of the system
+   * @param stream name of the stream
+   * @return Kinesis access key
+   */
+  String getStreamAccessKey(String system, String stream) {
+    return get(String.format(CONFIG_STREAM_ACCESS_KEY, system, stream));
+  }
+
+  /**
+   * Get the appropriate CredentialProvider for a given system stream.
+   * @param system name of the system
+   * @param stream name of the stream
+   * @return AWSCredentialsProvider
+   */
+  AWSCredentialsProvider credentialsProviderForStream(String system, String stream) {
+    // Try to load credentials in the following order:
+    // 1. Access key from the config and passed in secretKey
+    // 2. From the default credential provider chain (environment variables, system properties, AWS profile file, etc)
+    return new AWSCredentialsProviderChain(
+        new KinesisAWSCredentialsProvider(getStreamAccessKey(system, stream), getStreamSecretKey(system, stream)),
+        new DefaultAWSCredentialsProviderChain());
+  }
+
+  private void setAwsClientConfigs(Config config, ClientConfiguration clientConfig) {
+    for (Entry<String, String> entry : config.entrySet()) {
+      boolean found = false;
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (StringUtils.isEmpty(value)) {
+        continue;
+      }
+      for (Method method : ClientConfiguration.class.getMethods()) {
+        // For each property invoke the corresponding setter, if it exists
+        if (method.getName().equals("set" + key)) {
+          found = true;
+          Class<?> type = method.getParameterTypes()[0];
+          try {
+            if (type == long.class) {
+              method.invoke(clientConfig, Long.valueOf(value));
+            } else if (type == int.class) {
+              method.invoke(clientConfig, Integer.valueOf(value));
+            } else if (type == boolean.class) {
+              method.invoke(clientConfig, Boolean.valueOf(value));
+            } else if (type == String.class) {
+              method.invoke(clientConfig, value);
+            }
+            LOG.info("Loaded property " + key + " = " + value);
+            break;
+          } catch (Exception e) {
+            throw new IllegalArgumentException(
+                String.format("Error trying to set field %s with the value '%s'", key, value), e);
+          }
+        }
+      }
+      if (!found) {
+        LOG.warn("Property " + key + " ignored as there is no corresponding set method");
+      }
+    }
+  }
+
+  private void setKinesisClientLibConfigs(Map<String, String> config, KinesisClientLibConfiguration kinesisLibConfig) {
+    for (Entry<String, String> entry : config.entrySet()) {
+      boolean found = false;
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (StringUtils.isEmpty(value)) {
+        continue;
+      }
+      for (Method method : KinesisClientLibConfiguration.class.getMethods()) {
+        if (method.getName().equals("with" + key)) {
+          found = true;
+          Class<?> type = method.getParameterTypes()[0];
+          try {
+            if (type == long.class) {
+              method.invoke(kinesisLibConfig, Long.valueOf(value));
+            } else if (type == int.class) {
+              method.invoke(kinesisLibConfig, Integer.valueOf(value));
+            } else if (type == boolean.class) {
+              method.invoke(kinesisLibConfig, Boolean.valueOf(value));
+            } else if (type == String.class) {
+              method.invoke(kinesisLibConfig, value);
+            } else if (type == InitialPositionInStream.class) {
+              method.invoke(kinesisLibConfig, InitialPositionInStream.valueOf(value.toUpperCase()));
+            }
+            LOG.info("Loaded property " + key + " = " + value);
+            break;
+          } catch (Exception e) {
+            throw new IllegalArgumentException(
+                String.format("Error trying to set field %s with the value '%s'", key, value), e);
+          }
+        }
+      }
+      if (!found) {
+        LOG.warn("Property " + key + " ignored as there is no corresponding set method");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java
new file mode 100644
index 0000000..4843276
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemAdmin.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+
+
+/**
+ * A Kinesis-based implementation of SystemAdmin.
+ */
+public class KinesisSystemAdmin implements SystemAdmin {
+
+  private static final SystemStreamMetadata.SystemStreamPartitionMetadata SYSTEM_STREAM_PARTITION_METADATA =
+      new SystemStreamMetadata.SystemStreamPartitionMetadata(ExtendedSequenceNumber.TRIM_HORIZON.getSequenceNumber(),
+          ExtendedSequenceNumber.LATEST.getSequenceNumber(),
+          ExtendedSequenceNumber.LATEST.getSequenceNumber());
+
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisSystemAdmin.class.getName());
+
+  private final String system;
+  private final KinesisConfig kConfig;
+
+  public KinesisSystemAdmin(String system, KinesisConfig kConfig) {
+    this.system = system;
+    this.kConfig = kConfig;
+  }
+
+  /**
+   * Source of truth for checkpointing is always kinesis and the offsets written to samza checkpoint topic are ignored.
+   * Hence, return null for the getOffsetsAfter for a supplied map of ssps.
+   */
+  @Override
+  public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    Map<SystemStreamPartition, String> offsetsAfter = new HashMap<>();
+
+    for (SystemStreamPartition systemStreamPartition : offsets.keySet()) {
+      offsetsAfter.put(systemStreamPartition, null);
+    }
+
+    return offsetsAfter;
+  }
+
+  /**
+   * Source of truth for checkpointing is always kinesis and the offsets given by samza are always ignored by KCL.
+   * Hence, return a placeholder for each ssp.
+   */
+  @Override
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+    return streamNames.stream().collect(Collectors.toMap(Function.identity(), this::createSystemStreamMetadata));
+  }
+
+  private SystemStreamMetadata createSystemStreamMetadata(String stream) {
+    LOG.info("create stream metadata for stream {} based on aws stream", stream);
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadata = new HashMap<>();
+    AmazonKinesisClient client = null;
+
+    try {
+      ClientConfiguration clientConfig = kConfig.getAWSClientConfig(system);
+      AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
+          .withCredentials(kConfig.credentialsProviderForStream(system, stream))
+          .withClientConfiguration(clientConfig);
+      builder.setRegion(kConfig.getRegion(system, stream).getName());
+      client = (AmazonKinesisClient) builder.build();
+      StreamDescription desc = client.describeStream(stream).getStreamDescription();
+      IntStream.range(0, desc.getShards().size())
+          .forEach(i -> metadata.put(new Partition(i), SYSTEM_STREAM_PARTITION_METADATA));
+    } catch (Exception e) {
+      String errMsg = "couldn't load metadata for stream " + stream;
+      LOG.error(errMsg, e);
+      throw new SamzaException(errMsg, e);
+    } finally {
+      if (client != null) {
+        client.shutdown();
+      }
+    }
+
+    return new SystemStreamMetadata(stream, metadata);
+  }
+
+  /**
+   * Checkpoints are written to KCL and is always the source of truth. Format for Samza offsets is different from
+   * that of Kinesis checkpoint. Samza offsets are not comparable. Hence, return null.
+   */
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
new file mode 100644
index 0000000..558e871
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisSystemFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.TaskConfigJava;
+import org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+
+import org.apache.samza.system.kinesis.consumer.KinesisSystemConsumer;
+
+
+/**
+ * A Kinesis-based implementation of SystemFactory.
+ */
+public class KinesisSystemFactory implements SystemFactory {
+  @Override
+  public SystemConsumer getConsumer(String system, Config config, MetricsRegistry registry) {
+    KinesisConfig kConfig = new KinesisConfig(config);
+    return new KinesisSystemConsumer(system, kConfig, registry);
+  }
+
+  @Override
+  public SystemProducer getProducer(String system, Config config, MetricsRegistry registry) {
+    return null;
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String system, Config config) {
+    validateConfig(system, config);
+    KinesisConfig kConfig = new KinesisConfig(config);
+    return new KinesisSystemAdmin(system, kConfig);
+  }
+
+  protected void validateConfig(String system, Config config) {
+    // Kinesis system does not support groupers other than AllSspToSingleTaskGrouper
+    JobConfig jobConfig = new JobConfig(config);
+    if (!jobConfig.getSystemStreamPartitionGrouperFactory().equals(
+        AllSspToSingleTaskGrouperFactory.class.getCanonicalName())) {
+      String errMsg = String.format("Incorrect Grouper %s used for KinesisSystemConsumer %s. Please set the %s config"
+              + " to %s.", jobConfig.getSystemStreamPartitionGrouperFactory(), system,
+          JobConfig.SSP_GROUPER_FACTORY(), AllSspToSingleTaskGrouperFactory.class.getCanonicalName());
+      throw new ConfigException(errMsg);
+    }
+
+    // Kinesis streams cannot be configured as broadcast streams
+    TaskConfigJava taskConfig = new TaskConfigJava(config);
+    if (taskConfig.getBroadcastSystemStreams().stream().anyMatch(ss -> system.equals(ss.getSystem()))) {
+      throw new ConfigException("Kinesis streams cannot be configured as broadcast streams.");
+    }
+
+    // Kinesis streams cannot be configured as bootstrap streams
+    KinesisConfig kConfig = new KinesisConfig(config);
+    kConfig.getKinesisStreams(system).forEach(stream -> {
+        StreamConfig streamConfig = new StreamConfig(kConfig);
+        SystemStream ss = new SystemStream(system, stream);
+        if (streamConfig.getBootstrapEnabled(ss)) {
+          throw new ConfigException("Kinesis streams cannot be configured as bootstrap streams.");
+        }
+      });
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java
new file mode 100644
index 0000000..95e6b6a
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.Date;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Kinesis record with payload and some metadata.
+ */
+public class KinesisIncomingMessageEnvelope extends IncomingMessageEnvelope {
+  private final String shardId;
+  private final String sequenceNumber;
+  private final Date approximateArrivalTimestamp;
+
+  public KinesisIncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key,
+      Object message, String shardId, String sequenceNumber, Date approximateArrivalTimestamp) {
+    super(systemStreamPartition, offset, key, message);
+    this.shardId = shardId;
+    this.sequenceNumber = sequenceNumber;
+    this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+  }
+
+  public String getShardId() {
+    return shardId;
+  }
+
+  public String getSequenceNumber() {
+    return sequenceNumber;
+  }
+
+  public Date getApproximateArrivalTimestamp() {
+    return approximateArrivalTimestamp;
+  }
+
+  @Override
+  public String toString() {
+    return "KinesisIncomingMessageEnvelope:: shardId:" + shardId + ", sequenceNumber:" + sequenceNumber
+        + ", approximateArrivalTimestamp:" + approximateArrivalTimestamp + ", message:" + getMessage();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
new file mode 100644
index 0000000..53ff27f
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.List;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
+import com.amazonaws.services.kinesis.model.Record;
+
+
+/**
+ * Record processor for AWS kinesis stream. It does the following:
+ * <ul>
+ *   <li> when a shard is assigned by KCL in initialize API, it asks and gets an ssp from sspAllocator.
+ *   <li> when records are received in processRecords API, it translates them to IncomingMessageEnvelope and enqueues
+ *        the resulting envelope in the appropriate blocking buffer queue.
+ *   <li> when checkpoint API is called by samza, it checkpoints via KCL to Kinesis.
+ *   <li> when shutdown API is called by KCL, based on the terminate reason, it takes necessary action.
+ * </ul>
+ *
+ * initialize, processRecords and shutdown APIs are never called concurrently on a processor instance. However,
+ * checkpoint API could be called by Samza thread while processRecords and shutdown callback APIs are invoked by KCL.
+ * Please note that the APIs for different record processor instances could be called concurrently.
+ */
+
+public class KinesisRecordProcessor implements IRecordProcessor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordProcessor.class.getName());
+  static final long POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS = 1000;
+
+  private final SystemStreamPartition ssp;
+
+  private String shardId;
+  private KinesisRecordProcessorListener listener;
+  private IRecordProcessorCheckpointer checkpointer;
+  private ExtendedSequenceNumber initSeqNumber;
+
+  private volatile ExtendedSequenceNumber lastProcessedRecordSeqNumber;
+  private volatile ExtendedSequenceNumber lastCheckpointedRecordSeqNumber;
+
+  private boolean shutdownRequested = false;
+
+  KinesisRecordProcessor(SystemStreamPartition ssp, KinesisRecordProcessorListener listener) {
+    this.ssp = ssp;
+    this.listener = listener;
+  }
+
+  /**
+   * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
+   * (via processRecords).
+   *
+   * @param initializationInput Provides information related to initialization
+   */
+  @Override
+  public void initialize(InitializationInput initializationInput) {
+    Validate.isTrue(listener != null, "There is no listener set for the processor.");
+    initSeqNumber = initializationInput.getExtendedSequenceNumber();
+    shardId = initializationInput.getShardId();
+    LOG.info("Initialization done for {} with sequence {}", this,
+        initializationInput.getExtendedSequenceNumber().getSequenceNumber());
+  }
+
+  /**
+   * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
+   * application. Upon fail over, the new instance will get records with sequence number greater than the checkpoint
+   * position for each partition key.
+   *
+   * @param processRecordsInput Provides the records to be processed as well as information and capabilities related
+   *        to them (eg checkpointing).
+   */
+  @Override
+  public void processRecords(ProcessRecordsInput processRecordsInput) {
+    // KCL does not send any records to the processor that was shutdown.
+    Validate.isTrue(!shutdownRequested,
+        String.format("KCL returned records after shutdown is called on the processor %s.", this));
+    // KCL aways gives reference to the same checkpointer instance for a given processor instance.
+    checkpointer = processRecordsInput.getCheckpointer();
+    List<Record> records = processRecordsInput.getRecords();
+    // Empty records are expected when KCL config has CallProcessRecordsEvenForEmptyRecordList set to true.
+    if (!records.isEmpty()) {
+      lastProcessedRecordSeqNumber = new ExtendedSequenceNumber(records.get(records.size() - 1).getSequenceNumber());
+      listener.onReceiveRecords(ssp, records, processRecordsInput.getMillisBehindLatest());
+    }
+  }
+
+  /**
+   * Invoked by the Samza thread to commit checkpoint for the shard owned by the record processor instance.
+   *
+   * @param seqNumber sequenceNumber to checkpoint for the shard owned by this processor instance.
+   */
+  public void checkpoint(String seqNumber) {
+    ExtendedSequenceNumber seqNumberToCheckpoint = new ExtendedSequenceNumber(seqNumber);
+    if (initSeqNumber.compareTo(seqNumberToCheckpoint) > 0) {
+      LOG.warn("Samza called checkpoint with seqNumber {} smaller than initial seqNumber {} for {}. Ignoring it!",
+          seqNumber, initSeqNumber, this);
+      return;
+    }
+
+    if (checkpointer == null) {
+      // checkpointer could be null as a result of shard re-assignment before the first record is processed.
+      LOG.warn("Ignoring checkpointing for {} with seqNumber {} because of re-assignment.", this, seqNumber);
+      return;
+    }
+
+    try {
+      checkpointer.checkpoint(seqNumber);
+      lastCheckpointedRecordSeqNumber = seqNumberToCheckpoint;
+    } catch (ShutdownException e) {
+      // This can happen as a result of shard re-assignment.
+      String msg = String.format("Checkpointing %s with seqNumber %s failed with exception. Dropping the checkpoint.",
+          this, seqNumber);
+      LOG.warn(msg, e);
+    } catch (InvalidStateException e) {
+      // This can happen when KCL encounters issues with internal state, eg: dynamoDB table is not found
+      String msg =
+          String.format("Checkpointing %s with seqNumber %s failed with exception.", this, seqNumber);
+      LOG.error(msg, e);
+      throw new SamzaException(msg, e);
+    } catch (ThrottlingException e) {
+      // Throttling is handled by KCL via the client lib configuration properties. If we get an exception inspite of
+      // throttling back-off behavior, let's throw an exception as the configs
+      String msg = String.format("Checkpointing %s with seqNumber %s failed with exception. Checkpoint interval is"
+              + " too aggressive for the provisioned throughput of the dynamoDB table where the checkpoints are stored."
+              + " Either reduce the checkpoint interval -or- increase the throughput of dynamoDB table.", this,
+          seqNumber);
+      throw new SamzaException(msg);
+    }
+  }
+
+  /**
+   * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
+   * RecordProcessor instance.
+   *
+   * @param shutdownInput Provides information and capabilities (eg checkpointing) related to shutdown of this record
+   *        processor.
+   */
+  @Override
+  public void shutdown(ShutdownInput shutdownInput) {
+    LOG.info("Shutting down {} with reason:{}", this, shutdownInput.getShutdownReason());
+
+    Validate.isTrue(!shutdownRequested, String.format("KCL called shutdown more than once for processor %s.", this));
+    shutdownRequested = true;
+    // shutdown reason TERMINATE indicates that the shard is closed due to re-sharding. It also indicates that all the
+    // records from the shard have been delivered to the consumer and the consumer is expected to checkpoint the
+    // progress.
+    if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
+      // We need to ensure that all records are processed and checkpointed before going ahead and marking the processing
+      // complete by calling checkpoint() on KCL. We need to checkpoint the completion state for parent shard, for KCL
+      // to consume from the child shard(s).
+      try {
+        LOG.info("Waiting for all the records for {} to be processed.", this);
+        // Let's poll periodically and block until the last processed record is checkpointed. Also handle the case
+        // where there are no records received for the processor, in which case the lastProcessedRecordSeqNumber will
+        // be null.
+        while (lastProcessedRecordSeqNumber != null
+            && !lastProcessedRecordSeqNumber.equals(lastCheckpointedRecordSeqNumber)) {
+          Thread.sleep(POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS);
+        }
+        LOG.info("Final checkpoint for {} before shutting down.", this);
+        shutdownInput.getCheckpointer().checkpoint();
+      } catch (Exception e) {
+        LOG.warn("An error occurred while committing the final checkpoint in the parent shard {}", this, e);
+      }
+    }
+    listener.onShutdown(ssp);
+  }
+
+  String getShardId() {
+    return shardId;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("KinesisRecordProcessor: ssp %s shard %s hashCode %s", ssp, shardId, hashCode());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java
new file mode 100644
index 0000000..72d86b9
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessorListener.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.List;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+import com.amazonaws.services.kinesis.model.Record;
+
+
+/**
+ * Listener interface implemented by consumer to be notified when {@link KinesisRecordProcessor} receives records and
+ * is ready to shutdown.
+ */
+public interface KinesisRecordProcessorListener {
+  /**
+   * Method invoked by
+   * {@link KinesisRecordProcessor#processRecords(com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput)}
+   * when the records are received by the processor.
+   * @param ssp Samza partition for which the records belong to
+   * @param records List of kinesis records
+   * @param millisBehindLatest Time lag of the batch of records with respect to the tip of the stream
+   */
+  void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest);
+
+  /**
+   * Method invoked by
+   * {@link KinesisRecordProcessor#shutdown(com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput)}
+   * when the processor is ready to shutdown.
+   * @param ssp Samza partition for which the shutdown is invoked
+   */
+  void onShutdown(SystemStreamPartition ssp);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
new file mode 100644
index 0000000..6afffd3
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointListener;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.kinesis.KinesisConfig;
+import org.apache.samza.system.kinesis.metrics.KinesisSystemConsumerMetrics;
+import org.apache.samza.util.BlockingEnvelopeMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
+import com.amazonaws.services.kinesis.model.Record;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+/**
+ * The system consumer for Kinesis, extending the {@link BlockingEnvelopeMap}.
+ *
+ * The system consumer creates a KinesisWorker per stream in it's own thread by providing a RecordProcessorFactory.
+ * Kinesis Client Library (KCL) uses this factory to instantiate a KinesisRecordProcessor for each shard in the Kinesis
+ * stream. KCL pushes data records to the appropriate record processor and the processor is responsible for processing
+ * the resulting records and place them into a blocking queue in {@link BlockingEnvelopeMap}.
+ *
+ * <pre>
+ *   {@code
+ *                                                                                Shard1  +----------------------+
+ *                                                                . --------------------> |KinesisRecordProcessor|
+ *                        Stream1                                 |               Shard2  +----------------------+
+ *                              +-------------+     +-----------------------------+       +----------------------+
+ *             .--------------->|    Worker   |---->|    RecordProcessorFactory   | ----> |KinesisRecordProcessor|
+ *             |                +-------------+     +-------------+---------------+       +----------------------+
+ *             |                                                  |               Shard3  +----------------------+
+ *             |                                                  . --------------------> |KinesisRecordProcessor|
+ *             |                                                                          +----------------------+
+ *             |          Stream2
+ *  +---------------------+     +-------------+     +-----------------------------+        +-------+
+ *  |KinesisSystemConsumer|---->|    Worker   |---->|    RecordProcessorFactory   |------->|  ...  |
+ *  +---------------------+     +-------------+     +-----------------------------+        +-------+
+ *             |
+ *             |
+ *             |
+ *             |
+ *             |                +-----------+
+ *             . -------------->|    ...    |
+ *                              +-----------+
+ *  }
+ *  </pre>
+ * Since KinesisSystemConsumer uses KCL, the checkpoint state is stored in a dynamoDB table which is maintained by KCL.
+ * KinesisSystemConsumer implements CheckpointListener to commit checkpoints via KCL.
+ */
+
+public class KinesisSystemConsumer extends BlockingEnvelopeMap implements CheckpointListener, KinesisRecordProcessorListener {
+
+  private static final int MAX_BLOCKING_QUEUE_SIZE = 100;
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisSystemConsumer.class.getName());
+
+  private final String system;
+  private final KinesisConfig kConfig;
+  private final KinesisSystemConsumerMetrics metrics;
+  private final SSPAllocator sspAllocator;
+
+  private final Set<String> streams = new HashSet<>();
+  private final Map<SystemStreamPartition, KinesisRecordProcessor> processors = new ConcurrentHashMap<>();
+  private final List<Worker> workers = new LinkedList<>();
+
+  private ExecutorService executorService;
+
+  private volatile Exception callbackException;
+
+  public KinesisSystemConsumer(String systemName, KinesisConfig kConfig, MetricsRegistry registry) {
+    super(registry, System::currentTimeMillis, null);
+    this.system = systemName;
+    this.kConfig = kConfig;
+    this.metrics = new KinesisSystemConsumerMetrics(registry);
+    this.sspAllocator = new SSPAllocator();
+  }
+
+  @Override
+  protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
+    return new LinkedBlockingQueue<>(MAX_BLOCKING_QUEUE_SIZE);
+  }
+
+  @Override
+  protected void put(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) {
+    try {
+      super.put(ssp, envelope);
+    } catch (Exception e) {
+      LOG.error("Exception while putting record. Shutting down SystemStream {}", ssp.getSystemStream(), e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public void register(SystemStreamPartition ssp, String offset) {
+    LOG.info("Register called with ssp {} and offset {}. Offset will be ignored.", ssp, offset);
+    String stream = ssp.getStream();
+    streams.add(stream);
+    sspAllocator.free(ssp);
+    super.register(ssp, offset);
+  }
+
+  @Override
+  public void start() {
+    LOG.info("Start samza consumer for system {}.", system);
+
+    metrics.initializeMetrics(streams);
+
+    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("kinesis-worker-thread-" + system + "-%d")
+        .build();
+    // launch kinesis workers in separate threads, one per stream
+    executorService = Executors.newFixedThreadPool(streams.size(), namedThreadFactory);
+
+    for (String stream : streams) {
+      // KCL Dynamodb table is used for storing the state of processing. By default, the table name is the same as the
+      // application name. Dynamodb table name must be unique for a given account and region (even across different
+      // streams). So, let's create the default one with the combination of job name, job id and stream name. The table
+      // name could be changed by providing a different TableName via KCL specific config.
+      String kinesisApplicationName =
+          kConfig.get(JobConfig.JOB_NAME()) + "-" + kConfig.get(JobConfig.JOB_ID()) + "-" + stream;
+
+      Worker worker = new Worker.Builder()
+          .recordProcessorFactory(createRecordProcessorFactory(stream))
+          .config(kConfig.getKinesisClientLibConfig(system, stream, kinesisApplicationName))
+          .build();
+
+      workers.add(worker);
+
+      // launch kinesis workers in separate thread-pools, one per stream
+      executorService.execute(worker);
+      LOG.info("Started worker for system {} stream {}.", system, stream);
+    }
+  }
+
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+      Set<SystemStreamPartition> ssps, long timeout) throws InterruptedException {
+    if (callbackException != null) {
+      throw new SamzaException(callbackException);
+    }
+    return super.poll(ssps, timeout);
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("Stop samza consumer for system {}.", system);
+    workers.forEach(Worker::shutdown);
+    workers.clear();
+    executorService.shutdownNow();
+    LOG.info("Kinesis system consumer executor service for system {} is shutdown.", system);
+  }
+
+  // package-private for tests
+  IRecordProcessorFactory createRecordProcessorFactory(String stream) {
+    return () -> {
+      // This code is executed in Kinesis thread context.
+      try {
+        SystemStreamPartition ssp = sspAllocator.allocate(stream);
+        KinesisRecordProcessor processor = new KinesisRecordProcessor(ssp, KinesisSystemConsumer.this);
+        KinesisRecordProcessor prevProcessor = processors.put(ssp, processor);
+        Validate.isTrue(prevProcessor == null, String.format("Adding new kinesis record processor %s while the"
+                + " previous processor %s for the same ssp %s is still active.", processor, prevProcessor, ssp));
+        return processor;
+      } catch (Exception e) {
+        callbackException = e;
+        // This exception is the result of kinesis dynamic shard splits due to which sspAllocator ran out of free ssps.
+        // Set the failed state in consumer which will eventually result in stopping the container. A manual job restart
+        // will be required at this point. After the job restart, the newly created shards will be discovered and enough
+        // ssps will be added to sspAllocator freePool.
+        throw new SamzaException(e);
+      }
+    };
+  }
+
+  @Override
+  public void onCheckpoint(Map<SystemStreamPartition, String> sspOffsets) {
+    LOG.info("onCheckpoint called with sspOffsets {}", sspOffsets);
+    sspOffsets.forEach((ssp, offset) -> {
+        KinesisRecordProcessor processor = processors.get(ssp);
+        KinesisSystemConsumerOffset kinesisOffset = KinesisSystemConsumerOffset.parse(offset);
+        if (processor == null) {
+          LOG.info("Kinesis Processor is not alive for ssp {}. This could be the result of rebalance. Hence dropping the"
+              + " checkpoint {}.", ssp, offset);
+        } else if (!kinesisOffset.getShardId().equals(processor.getShardId())) {
+          LOG.info("KinesisProcessor for ssp {} currently owns shard {} while the checkpoint is for shard {}. This could"
+              + " be the result of rebalance. Hence dropping the checkpoint {}.", ssp, processor.getShardId(),
+              kinesisOffset.getShardId(), offset);
+        } else {
+          processor.checkpoint(kinesisOffset.getSeqNumber());
+        }
+      });
+  }
+
+  @Override
+  public void onReceiveRecords(SystemStreamPartition ssp, List<Record> records, long millisBehindLatest) {
+    metrics.updateMillisBehindLatest(ssp.getStream(), millisBehindLatest);
+    records.forEach(record -> put(ssp, translate(ssp, record)));
+  }
+
+  @Override
+  public void onShutdown(SystemStreamPartition ssp) {
+    processors.remove(ssp);
+    sspAllocator.free(ssp);
+  }
+
+  private IncomingMessageEnvelope translate(SystemStreamPartition ssp, Record record) {
+    String shardId = processors.get(ssp).getShardId();
+    byte[] payload = new byte[record.getData().remaining()];
+
+    metrics.updateMetrics(ssp.getStream(), record);
+    record.getData().get(payload);
+    KinesisSystemConsumerOffset offset = new KinesisSystemConsumerOffset(shardId, record.getSequenceNumber());
+    return new KinesisIncomingMessageEnvelope(ssp, offset.toString(), record.getPartitionKey(),
+        payload, shardId, record.getSequenceNumber(), record.getApproximateArrivalTimestamp());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
new file mode 100644
index 0000000..13296ca
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Kinesis system consumer related checkpoint information that is stored in the IncomingMessageEnvelope offset.
+ *
+ * It contains the following metadata:
+ * <ul>
+ *   <li> shardId: Kinesis stream shardId.
+ *   <li> seqNumber: sequence number in the above shard.
+ * </ul>
+ *
+ * Please note that the source of truth for checkpointing is the AWS dynamoDB table corresponding to the application.
+ * The offset that is stored in Samza checkpoint topic is not used.
+ */
+public class KinesisSystemConsumerOffset {
+
+  @JsonProperty("shardId")
+  private String shardId;
+  @JsonProperty("seqNumber")
+  private String seqNumber;
+
+  @JsonCreator
+  KinesisSystemConsumerOffset(@JsonProperty("shardId") String shardId,
+      @JsonProperty("seqNumber") String seqNumber) {
+    this.shardId = shardId;
+    this.seqNumber = seqNumber;
+  }
+
+  String getShardId() {
+    return shardId;
+  }
+
+  String getSeqNumber() {
+    return seqNumber;
+  }
+
+  static KinesisSystemConsumerOffset parse(String metadata) {
+    JsonSerdeV2<KinesisSystemConsumerOffset> jsonSerde = new JsonSerdeV2<>(KinesisSystemConsumerOffset.class);
+    byte[] bytes;
+    try {
+      bytes = metadata.getBytes("UTF-8");
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+    return jsonSerde.fromBytes(bytes);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public String toString() {
+    JsonSerdeV2<KinesisSystemConsumerOffset> jsonSerde = new JsonSerdeV2<>(KinesisSystemConsumerOffset.class);
+    return new String(jsonSerde.toBytes(this));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == this) {
+      return true;
+    }
+    if (!(o instanceof KinesisSystemConsumerOffset)) {
+      return false;
+    }
+
+    String thatShardId = ((KinesisSystemConsumerOffset) o).getShardId();
+    if (!(shardId == null ? thatShardId == null : shardId.equals(thatShardId))) {
+      return false;
+    }
+    String thatSeqNumber = ((KinesisSystemConsumerOffset) o).getSeqNumber();
+    if (!(seqNumber == null ? thatSeqNumber == null : seqNumber.equals(thatSeqNumber))) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = shardId.hashCode();
+    result = 31 * result + seqNumber.hashCode();
+    return result;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java
new file mode 100644
index 0000000..6caf760
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/NoAvailablePartitionException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+
+/**
+ * SSPAllocator is unable to allocate an SSP
+ */
+public class NoAvailablePartitionException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  public NoAvailablePartitionException(String message) {
+    super(message);
+  }
+
+  public NoAvailablePartitionException(String message, Exception e) {
+    super(message, e);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
new file mode 100644
index 0000000..4b7cff8
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.consumer;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.Validate;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SSPAllocator is responsible for assigning Samza SystemStreamPartitions (SSPs). It provides two APIs:
+ * <ul>
+ *   <li> allocate: Given a stream, returns free ssp.
+ *   <li> free: Adds ssp back to the free pool.
+ * </ul>
+ * A free (unallocated) ssp is returned for every allocate request and when there is no available ssp to allocate,
+ * the allocator throws NoAvailablePartitionException. Allocator could run out of free ssps as a result of dynamic
+ * shard splits.
+ */
+class SSPAllocator {
+  private static final Logger LOG = LoggerFactory.getLogger(SSPAllocator.class.getName());
+
+  private final Map<String, Set<SystemStreamPartition>> availableSsps = new HashMap<>();
+
+  synchronized SystemStreamPartition allocate(String stream) throws NoAvailablePartitionException {
+    Validate.isTrue(availableSsps.get(stream) != null,
+        String.format("availableSsps is null for stream %s", stream));
+
+    if (availableSsps.get(stream).size() <= 0) {
+      // Set a flag in system consumer so that it could throw an exception in the subsequent poll.
+      throw new NoAvailablePartitionException(String.format("More shards detected for stream %s than initially"
+          + " registered. Could be the result of dynamic resharding.", stream));
+    }
+
+    SystemStreamPartition ssp = availableSsps.get(stream).iterator().next();
+    availableSsps.get(stream).remove(ssp);
+
+    LOG.info("Number of unassigned partitions for system-stream {} is {}.", ssp.getSystemStream(),
+        availableSsps.get(ssp.getStream()).size());
+    return ssp;
+  }
+
+  synchronized void free(SystemStreamPartition ssp) {
+    boolean success = availableSsps.computeIfAbsent(ssp.getStream(), p -> new HashSet<>()).add(ssp);
+    Validate.isTrue(success, String.format("Ssp %s is already in free pool.", ssp));
+
+    LOG.info("Number of unassigned partitions for system-stream {} is {}.", ssp.getSystemStream(),
+        availableSsps.get(ssp.getStream()).size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
new file mode 100644
index 0000000..2f42981
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/KinesisSystemConsumerMetrics.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.metrics;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsRegistry;
+
+import com.amazonaws.services.kinesis.model.Record;
+
+
+/**
+ * KinesisSystemConsumerMetrics class has per-stream metrics and aggregate metrics across kinesis consumers
+ */
+
+public class KinesisSystemConsumerMetrics {
+
+  private final MetricsRegistry registry;
+
+  // Aggregate metrics across all kinesis system consumers
+  private static Counter aggEventReadRate = null;
+  private static Counter aggEventByteReadRate = null;
+  private static SamzaHistogram aggReadLatency = null;
+  private static SamzaHistogram aggMillisBehindLatest = null;
+
+  // Per-stream metrics
+  private Map<String, Counter> eventReadRates;
+  private Map<String, Counter> eventByteReadRates;
+  private Map<String, SamzaHistogram> readLatencies;
+  private Map<String, SamzaHistogram> millisBehindLatest;
+
+  private static final Object LOCK = new Object();
+
+  private static final String AGGREGATE = "aggregate";
+  private static final String EVENT_READ_RATE = "eventReadRate";
+  private static final String EVENT_BYTE_READ_RATE = "eventByteReadRate";
+  private static final String READ_LATENCY = "readLatency";
+  private static final String MILLIS_BEHIND_LATEST = "millisBehindLatest";
+
+  public KinesisSystemConsumerMetrics(MetricsRegistry registry) {
+    this.registry = registry;
+  }
+
+  public void initializeMetrics(Set<String> streamNames) {
+    eventReadRates = streamNames.stream()
+        .collect(Collectors.toConcurrentMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE)));
+    eventByteReadRates = streamNames.stream()
+        .collect(Collectors.toConcurrentMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE)));
+    readLatencies = streamNames.stream()
+        .collect(Collectors.toConcurrentMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY)));
+    millisBehindLatest = streamNames.stream()
+        .collect(Collectors.toConcurrentMap(Function.identity(),
+            x -> new SamzaHistogram(registry, x, MILLIS_BEHIND_LATEST)));
+
+    // Locking to ensure that these aggregated metrics will be created only once across multiple system consumers.
+    synchronized (LOCK) {
+      if (aggEventReadRate == null) {
+        aggEventReadRate = registry.newCounter(AGGREGATE, EVENT_READ_RATE);
+        aggEventByteReadRate = registry.newCounter(AGGREGATE, EVENT_BYTE_READ_RATE);
+        aggReadLatency = new SamzaHistogram(registry, AGGREGATE, READ_LATENCY);
+        aggMillisBehindLatest = new SamzaHistogram(registry, AGGREGATE, MILLIS_BEHIND_LATEST);
+      }
+    }
+  }
+
+  public void updateMillisBehindLatest(String stream, Long millisBehindLatest) {
+    this.millisBehindLatest.get(stream).update(millisBehindLatest);
+    aggMillisBehindLatest.update(millisBehindLatest);
+  }
+
+  public void updateMetrics(String stream, Record record) {
+    eventReadRates.get(stream).inc();
+    aggEventReadRate.inc();
+
+    long recordSize = record.getData().array().length + record.getPartitionKey().length();
+    eventByteReadRates.get(stream).inc(recordSize);
+    aggEventByteReadRate.inc(recordSize);
+
+    long latencyMs = Duration.between(Instant.now(), record.getApproximateArrivalTimestamp().toInstant()).toMillis();
+    readLatencies.get(stream).update(latencyMs);
+    aggReadLatency.update(latencyMs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java
new file mode 100644
index 0000000..29964dc
--- /dev/null
+++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/metrics/SamzaHistogram.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis.metrics;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+
+
+class SamzaHistogram {
+
+  private static final List<Double> DEFAULT_HISTOGRAM_PERCENTILES = Arrays.asList(50D, 99D);
+  private final MetricsRegistry registry;
+  private final Histogram histogram;
+  private final List<Double> percentiles;
+  private final Map<Double, Gauge<Double>> gauges;
+
+  SamzaHistogram(MetricsRegistry registry, String group, String name) {
+    this(registry, group, name, DEFAULT_HISTOGRAM_PERCENTILES);
+  }
+
+  SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) {
+    this.registry = registry;
+    this.histogram = new Histogram(new ExponentiallyDecayingReservoir());
+    this.percentiles = percentiles;
+    this.gauges = percentiles.stream()
+        .filter(x -> x > 0 && x <= 100)
+        .collect(
+            Collectors.toMap(Function.identity(), x -> this.registry.newGauge(group, name + "_" + String.valueOf(0), 0D)));
+  }
+
+  synchronized void update(long value) {
+    histogram.update(value);
+    Snapshot values = histogram.getSnapshot();
+    percentiles.forEach(x -> gauges.get(x).set(values.getValue(x / 100)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java
new file mode 100644
index 0000000..93887ed
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisAWSCredentialsProvider.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestKinesisAWSCredentialsProvider {
+
+  @Test
+  public void testCredentialsProviderWithNonNullKeys() {
+    String accessKey = "accessKey";
+    String secretKey = "secretKey";
+    KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(accessKey, secretKey);
+    assertEquals(credProvider.getCredentials().getAWSAccessKeyId(), accessKey);
+    assertEquals(credProvider.getCredentials().getAWSSecretKey(), secretKey);
+  }
+
+  @Test
+  public void testCredentialsProviderWithNullAccessKey() {
+    String secretKey = "secretKey";
+    KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(null, secretKey);
+    assertNull(credProvider.getCredentials().getAWSAccessKeyId());
+    assertNull(credProvider.getCredentials().getAWSSecretKey());
+  }
+
+  @Test
+  public void testCredentialsProviderWithNullSecretKey() {
+    String accessKey = "accessKey";
+    KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(accessKey, null);
+    assertNull(credProvider.getCredentials().getAWSAccessKeyId());
+    assertNull(credProvider.getCredentials().getAWSSecretKey());
+  }
+
+  @Test
+  public void testCredentialsProviderWithNullKeys() {
+    KinesisAWSCredentialsProvider credProvider = new KinesisAWSCredentialsProvider(null, null);
+    assertNull(credProvider.getCredentials().getAWSAccessKeyId());
+    assertNull(credProvider.getCredentials().getAWSSecretKey());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/9961023f/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java
----------------------------------------------------------------------
diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java
new file mode 100644
index 0000000..56e4810
--- /dev/null
+++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/TestKinesisConfig.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kinesis;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.junit.Test;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+
+import static org.junit.Assert.*;
+
+
+public class TestKinesisConfig {
+  @Test
+  public void testGetKinesisStreams() {
+    Map<String, String> kv = new HashMap<>();
+    kv.put("systems.kinesis.streams.kinesis-stream1.prop1", "value1");
+    kv.put("systems.kinesis.streams.kinesis-stream1.prop2", "value2");
+    kv.put("systems.kinesis.streams.kinesis-stream2.prop1", "value3");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+
+    Set<String> streams = kConfig.getKinesisStreams("kinesis");
+    assertEquals(2, streams.size());
+  }
+
+  @Test
+  public void testKinesisConfigs() {
+    Map<String, String> kv = new HashMap<>();
+    String system = "kinesis";
+    String stream = "kinesis-stream";
+    String systemConfigPrefix = String.format("systems.%s.", system);
+    String ssConfigPrefix = String.format("systems.%s.streams.%s.", system, stream);
+
+    kv.put("sensitive." + ssConfigPrefix + "aws.secretKey", "secretKey");
+    kv.put(systemConfigPrefix + "aws.region", "us-east-1");
+    kv.put(ssConfigPrefix + "aws.accessKey", "accessKey");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+
+    assertEquals("us-east-1", kConfig.getRegion(system, stream).getName());
+    assertEquals("accessKey", kConfig.getStreamAccessKey(system, stream));
+    assertEquals("secretKey", kConfig.getStreamSecretKey(system, stream));
+  }
+
+  @Test
+  public void testAwsClientConfigs() {
+    Map<String, String> kv = new HashMap<>();
+    String system = "kinesis";
+    String systemConfigPrefix = String.format("systems.%s.", system);
+
+    // Aws Client Configs
+    kv.put(systemConfigPrefix + "aws.clientConfig.ProxyHost", "hostName");
+    kv.put(systemConfigPrefix + "aws.clientConfig.ProxyPort", "8080");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+
+    assertEquals("hostName", kConfig.getAWSClientConfig(system).getProxyHost());
+    assertEquals(8080, kConfig.getAWSClientConfig(system).getProxyPort());
+  }
+
+  @Test
+  public void testKclConfigs() {
+    Map<String, String> kv = new HashMap<>();
+    String system = "kinesis";
+    String stream = "kinesis-stream";
+    String systemConfigPrefix = String.format("systems.%s.", system);
+
+    // region config is required for setting kcl config.
+    kv.put(systemConfigPrefix + "aws.region", "us-east-1");
+
+    // Kcl Configs
+    kv.put(systemConfigPrefix + "aws.kcl.TableName", "sample-table");
+    kv.put(systemConfigPrefix + "aws.kcl.MaxRecords", "100");
+    kv.put(systemConfigPrefix + "aws.kcl.CallProcessRecordsEvenForEmptyRecordList", "true");
+    kv.put(systemConfigPrefix + "aws.kcl.InitialPositionInStream", "TRIM_HORIZON");
+    // override one of the Kcl configs for kinesis-stream1
+    kv.put(systemConfigPrefix + "streams.kinesis-stream1.aws.kcl.InitialPositionInStream", "LATEST");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+    KinesisClientLibConfiguration kclConfig = kConfig.getKinesisClientLibConfig(system, stream, "sample-app");
+
+    assertEquals("sample-table", kclConfig.getTableName());
+    assertEquals(100, kclConfig.getMaxRecords());
+    assertTrue(kclConfig.shouldCallProcessRecordsEvenForEmptyRecordList());
+    assertEquals(InitialPositionInStream.TRIM_HORIZON, kclConfig.getInitialPositionInStream());
+
+    // verify if the overriden config is applied for kinesis-stream1
+    kclConfig = kConfig.getKinesisClientLibConfig(system, "kinesis-stream1", "sample-app");
+    assertEquals(InitialPositionInStream.LATEST, kclConfig.getInitialPositionInStream());
+  }
+
+  @Test
+  public void testgetKCLConfigWithUnknownConfigs() {
+    Map<String, String> kv = new HashMap<>();
+    kv.put("systems.kinesis.aws.region", "us-east-1");
+    kv.put("systems.kinesis.streams.kinesis-stream.aws.kcl.random", "value");
+
+    Config config = new MapConfig(kv);
+    KinesisConfig kConfig = new KinesisConfig(config);
+
+    // Should not throw any exception and just ignore the unknown configs.
+    kConfig.getKinesisClientLibConfig("kinesis", "kinesis-stream", "sample-app");
+  }
+}