You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/08/03 05:53:58 UTC

[1/3] storm git commit: Merge branch 'STORM-1839' of https://github.com/priyank5485/storm into STORM-1839

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 14924e3ea -> f4480ade9


http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
new file mode 100644
index 0000000..1894934
--- /dev/null
+++ b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
@@ -0,0 +1,31 @@
+package org.apache.storm.kinesis.spout.test;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class KinesisBoltTest extends BaseRichBolt {
+    protected static final Logger LOG = LoggerFactory.getLogger(KinesisBoltTest.class);
+    private transient OutputCollector collector;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        LOG.info("input = [" + input + "]");
+        collector.ack(input);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
new file mode 100644
index 0000000..2a39463
--- /dev/null
+++ b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
@@ -0,0 +1,40 @@
+package org.apache.storm.kinesis.spout.test;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.kinesis.spout.CredentialsProviderChain;
+import org.apache.storm.kinesis.spout.ExponentialBackoffRetrier;
+import org.apache.storm.kinesis.spout.KinesisConfig;
+import org.apache.storm.kinesis.spout.KinesisConnectionInfo;
+import org.apache.storm.kinesis.spout.KinesisSpout;
+import org.apache.storm.kinesis.spout.RecordToTupleMapper;
+import org.apache.storm.kinesis.spout.ZkInfo;
+import org.apache.storm.topology.TopologyBuilder;
+
+import java.util.Date;
+
+public class KinesisSpoutTopology {
+    public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
+        String topologyName = args[0];
+        RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
+        KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2,
+                1000);
+        ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000);
+        KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON,
+                recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L);
+        KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig);
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        topologyBuilder.setSpout("spout", kinesisSpout, 3);
+        topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");
+        Config topologyConfig = new Config();
+        topologyConfig.setDebug(true);
+        topologyConfig.setNumWorkers(3);
+        StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
new file mode 100644
index 0000000..03e024a
--- /dev/null
+++ b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
@@ -0,0 +1,41 @@
+package org.apache.storm.kinesis.spout.test;
+
+import com.amazonaws.services.kinesis.model.Record;
+import org.apache.storm.kinesis.spout.RecordToTupleMapper;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestRecordToTupleMapper implements RecordToTupleMapper, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(TestRecordToTupleMapper.class);
+    @Override
+    public Fields getOutputFields() {
+        return new Fields("partitionKey", "sequenceNumber", "data");
+    }
+
+    @Override
+    public List<Object> getTuple(Record record) {
+        CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
+        List<Object> tuple = new ArrayList<>();
+        tuple.add(record.getPartitionKey());
+        tuple.add(record.getSequenceNumber());
+        try {
+            String data = decoder.decode(record.getData()).toString();
+            LOG.info("data is " + data);
+            tuple.add(data);
+        } catch (CharacterCodingException e) {
+            e.printStackTrace();
+            LOG.warn("Exception occured. Emitting tuple with empty string data", e);
+            tuple.add("");
+        }
+        LOG.info("Tuple from record is " + tuple);
+        return tuple;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 01105c1..da006a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -291,6 +291,7 @@
         <module>external/storm-kafka-client</module>
         <module>external/storm-opentsdb</module>
         <module>external/storm-kafka-monitor</module>
+        <module>external/storm-kinesis</module>
     </modules>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 646bef0..0471501 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -107,6 +107,20 @@
             </includes>
         </fileSet>
         <fileSet>
+            <directory>${project.basedir}/../../external/storm-kinesis/target</directory>
+            <outputDirectory>external/storm-kinesis</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-kinesis</directory>
+            <outputDirectory>external/storm-kinesis</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
+        <fileSet>
             <directory>${project.basedir}/../../external/storm-hdfs/target</directory>
             <outputDirectory>external/storm-hdfs</outputDirectory>
             <includes>


[3/3] storm git commit: Added STORM-1839 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1839 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f4480ade
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f4480ade
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f4480ade

Branch: refs/heads/1.x-branch
Commit: f4480ade9aa124152b4946b9c6b9d90103128300
Parents: eed4b42
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Aug 2 21:59:42 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Aug 2 21:59:42 2016 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f4480ade/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0e181d8..ada24e2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-1839: Storm spout implementation for Amazon Kinesis Streams.
  * STORM-1876: Option to build storm-kafka and storm-kafka-client with different kafka client version
  * STORM-2000: Package storm-opentsdb as part of external dir in installation
  * STORM-1989: X-Frame-Options support for Storm UI


[2/3] storm git commit: Merge branch 'STORM-1839' of https://github.com/priyank5485/storm into STORM-1839

Posted by sr...@apache.org.
Merge branch 'STORM-1839' of https://github.com/priyank5485/storm into STORM-1839


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eed4b423
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eed4b423
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eed4b423

Branch: refs/heads/1.x-branch
Commit: eed4b42358faf0eef2585a402fbe342b10a2edc3
Parents: 14924e3
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Aug 2 21:35:10 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Aug 2 21:59:12 2016 -0700

----------------------------------------------------------------------
 external/storm-kinesis/README.md                | 140 ++++++
 external/storm-kinesis/pom.xml                  |  69 +++
 .../kinesis/spout/CredentialsProviderChain.java |  39 ++
 .../spout/ExponentialBackoffRetrier.java        | 164 +++++++
 .../spout/FailedMessageRetryHandler.java        |  48 ++
 .../storm/kinesis/spout/KinesisConfig.java      | 133 ++++++
 .../storm/kinesis/spout/KinesisConnection.java  | 108 +++++
 .../kinesis/spout/KinesisConnectionInfo.java    | 111 +++++
 .../storm/kinesis/spout/KinesisMessageId.java   |  73 +++
 .../kinesis/spout/KinesisRecordsManager.java    | 449 +++++++++++++++++++
 .../storm/kinesis/spout/KinesisSpout.java       |  86 ++++
 .../kinesis/spout/RecordToTupleMapper.java      |  38 ++
 .../storm/kinesis/spout/ZKConnection.java       |  95 ++++
 .../org/apache/storm/kinesis/spout/ZkInfo.java  | 118 +++++
 .../kinesis/spout/test/KinesisBoltTest.java     |  31 ++
 .../spout/test/KinesisSpoutTopology.java        |  40 ++
 .../spout/test/TestRecordToTupleMapper.java     |  41 ++
 pom.xml                                         |   1 +
 storm-dist/binary/src/main/assembly/binary.xml  |  14 +
 19 files changed, 1798 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/README.md b/external/storm-kinesis/README.md
new file mode 100644
index 0000000..f163a54
--- /dev/null
+++ b/external/storm-kinesis/README.md
@@ -0,0 +1,140 @@
+#Storm Kinesis Spout
+Provides core storm spout for consuming data from a stream in Amazon Kinesis Streams. It stores the sequence numbers that can be committed in zookeeper and 
+starts consuming records after that sequence number on restart by default. Below is the code sample to create a sample topology that uses the spout. Each 
+object used in configuring the spout is explained below. Ideally, the number of spout tasks should be equal to number of shards in kinesis. However each task 
+can read from more than one shard.
+
+```java
+public class KinesisSpoutTopology {
+    public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
+        String topologyName = args[0];
+        RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
+        KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2,
+                1000);
+        ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000);
+        KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON,
+                recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L);
+        KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig);
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        topologyBuilder.setSpout("spout", kinesisSpout, 3);
+        topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");
+        Config topologyConfig = new Config();
+        topologyConfig.setDebug(true);
+        topologyConfig.setNumWorkers(3);
+        StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology());
+    }
+}
+```
+As you can see above the spout takes an object of KinesisConfig in its constructor. The constructor of KinesisConfig takes 8 objects as explained below.
+
+#### `String` streamName
+name of kinesis stream to consume data from
+
+#### `ShardIteratorType` shardIteratorType
+3 types are supported - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. By default this argument is ignored if state for shards 
+is found in zookeeper. Hence they will apply the first time a topology is started. If you want to use any of these in subsequent runs of the topology, you 
+will need to clear the state of zookeeper node used for storing sequence numbers
+
+#### `RecordToTupleMapper` recordToTupleMapper
+an implementation of `RecordToTupleMapper` interface that spout will call to convert a kinesis record to a storm tuple. It has two methods. getOutputFields 
+tells the spout the fields that will be present in the tuple emitted from the getTuple method. If getTuple returns null, the record will be acked
+```java
+    Fields getOutputFields ();
+    List<Object> getTuple (Record record);
+```
+
+#### `Date` timestamp
+used in conjunction with the AT_TIMESTAMP shardIteratorType argument. This will make the spout fetch records from kinesis starting at that time or later. The
+time used by kinesis is the server side time associated to the record by kinesis
+
+#### `FailedMessageRetryHadnler` failedMessageRetryHandler 
+an implementation of the `FailedMessageRetryHandler` interface. By default this module provides an implementation that supports a exponential backoff retry
+mechanism for failed messages. That implementation has two constructors. Default no args constructor will configure first retry at 100 milliseconds and 
+subsequent retires at Math.pow(2, i-1) where i is the retry number in the range 2 to LONG.MAX_LONG. 2 represents the base for exponential function in seconds. 
+Other constructor takes retry interval in millis for first retry as first argument, base for exponential function in seconds as second argument and number of 
+retries as third argument. The methods of this interface and its working in accord with the spout is explained below
+```java
+    boolean failed (KinesisMessageId messageId);
+    KinesisMessageId getNextFailedMessageToRetry ();
+    void failedMessageEmitted (KinesisMessageId messageId);
+    void acked (KinesisMessageId messageId);
+```
+failed method will be called on every tuple that failed in the spout. It should return true if that failed message is scheduled to be retried, false otherwise.
+
+getNextFailedMessageToRetry method will be called the first thing every time a spout wants to emit a tuple. It should return a message that should be retried
+if any or null otherwise. Note that it can return null in the case it does not have any message to retry as of that moment. However, it should eventually 
+return every message for which it returned true when failed method was called for that message
+
+failedMessageEmitted will be called if spout successfully manages to get the record from kinesis and emit it. If not, the implementation should return the same 
+message when getNextFailedMessageToRetry is called again
+
+acked will be called once the failed message was re-emitted and successfully acked by the spout. If it was failed by the spout failed will be called again
+
+#### `ZkInfo` zkInfo
+an object encapsulating information for zookeeper interaction. The constructor takes zkUrl as first argument which is a comma separated string of zk host and
+port, zkNode as second that will be used as the root node for storing committed sequence numbers, session timeout as third in milliseconds, connection timeout
+as fourth in milliseconds, commit interval as fifth in milliseconds for committing sequence numbers to zookeeper, retry attempts as sixth for zk client
+connection retry attempts, retry interval as seventh in milliseconds for time to wait before retrying to connect. 
+
+#### `KinesisConnectionInfo` kinesisConnectionInfo
+an object that captures arguments for connecting to kinesis using kinesis client. It has a constructor that takes an implementation of `AWSCredentialsProvider`
+as first argument. This module provides an implementation called `CredentialsProviderChain` that allows the spout to authenticate with kinesis using one of 
+the 5 mechanisms in this order - `EnvironmentVariableCredentialsProvider`, `SystemPropertiesCredentialsProvider`, `ClasspathPropertiesFileCredentialsProvider`, 
+`InstanceProfileCredentialsProvider`, `ProfileCredentialsProvider`. It takes an object of `ClientConfiguration` as second argument for configuring the kinesis 
+client, `Regions` as third argument that sets the region to connect to on the client and recordsLimit as the fourth argument which represents the maximum number
+of records kinesis client will retrieve for every GetRecords request. This limit should be carefully chosen based on the size of the record, kinesis 
+throughput rate limits and per tuple latency in storm for the topology. Also if one task will be reading from more than one shards then that will also affect
+the choice of limit argument
+
+#### `Long` maxUncommittedRecords
+this represents the maximum number of uncommitted sequence numbers allowed per task. Once this number is reached spout will not fetch any new records from 
+kinesis. Uncommited sequence numbers are defined as the sum of all the messages for a task that have not been committed to zookeeper. This is different from 
+topology level max pending messages. For example if this value is set to 10, and the spout emitted sequence numbers from 1 to 10. Sequence number 1 is pending 
+and 2 to 10 acked. In that case the number of uncommitted sequence numbers is 10 since no sequence number in the range 1 to 10 can be committed to zk. 
+However, storm can still call next tuple on the spout because there is only 1 pending message
+ 
+### Maven dependencies
+Aws sdk version that this was tested with is 1.10.77
+
+```xml
+ <dependencies>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+            <version>${aws-java-sdk.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+        </dependency>
+ </dependencies>
+```
+
+#Future Work
+Handle merging or splitting of shards in kinesis, Trident spout implementation and metrics
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/pom.xml b/external/storm-kinesis/pom.xml
new file mode 100644
index 0000000..f1872dc
--- /dev/null
+++ b/external/storm-kinesis/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+     http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>storm-kinesis</artifactId>
+    <name>storm-kinesis</name>
+
+    <properties>
+        <aws-java-sdk.version>1.10.77</aws-java-sdk.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+            <version>${aws-java-sdk.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
+

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
new file mode 100644
index 0000000..4287ae0
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.auth.AWSCredentialsProviderChain;
+import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.InstanceProfileCredentialsProvider;
+import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+
+/**
+ * Class representing chain of mechanisms that will be used in order to connect to kinesis
+ */
+public class CredentialsProviderChain extends AWSCredentialsProviderChain {
+    public CredentialsProviderChain () {
+        super(new EnvironmentVariableCredentialsProvider(),
+                new SystemPropertiesCredentialsProvider(),
+                new ClasspathPropertiesFileCredentialsProvider(),
+                new InstanceProfileCredentialsProvider(),
+                new ProfileCredentialsProvider());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
new file mode 100644
index 0000000..2a702f8
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
+    // Wait interfal for retrying after first failure
+    private final Long initialDelayMillis;
+    // Base for exponential function in seconds for retrying for second, third and so on failures
+    private final Long baseSeconds;
+    // Maximum number of retries
+    private final Long maxRetries;
+    // map to track number of failures for each kinesis message that failed
+    private Map<KinesisMessageId, Long> failCounts = new HashMap<>();
+    // map to track next retry time for each kinesis message that failed
+    private Map<KinesisMessageId, Long> retryTimes = new HashMap<>();
+    // sorted set of records to be retrued based on retry time. earliest retryTime record comes first
+    private SortedSet<KinesisMessageId> retryMessageSet = new TreeSet<>(new RetryTimeComparator());
+
+    /**
+     * no args constructor that uses defaults of 100 ms for first retry, max retries of Long.MAX_VALUE and an exponential backoff of Math.pow(2,i-1) secs for
+     * retry i where i = 2,3,
+     */
+    public ExponentialBackoffRetrier () {
+        this(100L, 2L, Long.MAX_VALUE);
+    }
+
+    /**
+     *
+     * @param initialDelayMillis delay in milliseconds for first retry
+     * @param baseSeconds base for exponent function in seconds
+     * @param maxRetries maximum number of retries before the record is discarded/acked
+     */
+    public ExponentialBackoffRetrier (Long initialDelayMillis, Long baseSeconds, Long maxRetries) {
+        this.initialDelayMillis = initialDelayMillis;
+        this.baseSeconds = baseSeconds;
+        this.maxRetries = maxRetries;
+        validate();
+    }
+
+    private void validate () {
+        if (initialDelayMillis < 0) {
+            throw new IllegalArgumentException("initialDelayMillis cannot be negative." );
+        }
+        if (baseSeconds < 0) {
+            throw new IllegalArgumentException("baseSeconds cannot be negative.");
+        }
+        if (maxRetries < 0) {
+            throw new IllegalArgumentException("maxRetries cannot be negative.");
+        }
+    }
+    @Override
+    public boolean failed(KinesisMessageId messageId) {
+        LOG.debug("Handling failed message " + messageId);
+        // if maxRetries is 0, dont retry and return false as per interface contract
+        if (maxRetries == 0) {
+            LOG.warn("maxRetries set to 0. Hence not queueing " + messageId);
+            return false;
+        }
+        // if first failure add it to the count map
+        if (!failCounts.containsKey(messageId)) {
+            failCounts.put(messageId, 0L);
+        }
+        // increment the fail count as we started with 0
+        Long failCount = failCounts.get(messageId);
+        failCounts.put(messageId, ++failCount);
+        // if fail count is greater than maxRetries, discard or ack. for e.g. for maxRetries 3, 4 failures are allowed at maximum
+        if (failCount > maxRetries) {
+            LOG.warn("maxRetries reached so dropping " + messageId);
+            failCounts.remove(messageId);
+            return false;
+        }
+        // if reached so far, add it to the set of messages waiting to be retried with next retry time based on how many times it failed
+        retryTimes.put(messageId, getRetryTime(failCount));
+        retryMessageSet.add(messageId);
+        LOG.debug("Scheduled " + messageId + " for retry at " + retryTimes.get(messageId) + " and retry attempt " + failCount);
+        return true;
+    }
+
+    @Override
+    public void acked(KinesisMessageId messageId) {
+        // message was acked after being retried. so clear the state for that message
+        LOG.debug("Ack received for " + messageId + ". Hence cleaning state.");
+        failCounts.remove(messageId);
+    }
+
+    @Override
+    public KinesisMessageId getNextFailedMessageToRetry() {
+        KinesisMessageId result = null;
+        // return the first message to be retried from the set. It will return the message with the earliest retry time <= current time
+        if (!retryMessageSet.isEmpty() ) {
+            result = retryMessageSet.first();
+            if (!(retryTimes.get(result) <= System.nanoTime())) {
+                result = null;
+            }
+        }
+        LOG.debug("Returning " + result + " to spout for retrying.");
+        return result;
+    }
+
+    @Override
+    public void failedMessageEmitted(KinesisMessageId messageId) {
+        // spout notified that message returned by us for retrying was actually emitted. hence remove it from set and wait for its ack or fail
+        // but still keep it in counts map to retry again on failure or remove on ack
+        LOG.debug("Spout says " + messageId + " emitted. Hence removing it from queue and wait for its ack or fail");
+        retryMessageSet.remove(messageId);
+        retryTimes.remove(messageId);
+    }
+
+    // private helper method to get next retry time for retry attempt i (handles long overflow as well by capping it to Long.MAX_VALUE)
+    private Long getRetryTime (Long retryNum) {
+        Long retryTime = System.nanoTime();
+        Long nanoMultiplierForMillis = 1000000L;
+        // if first retry then retry time  = current time  + initial delay
+        if (retryNum == 1) {
+            retryTime += initialDelayMillis * nanoMultiplierForMillis;
+        } else {
+            // else use the exponential backoff logic and handle long overflow
+            Long maxValue = Long.MAX_VALUE;
+            double time = Math.pow(baseSeconds, retryNum - 1) * 1000 * nanoMultiplierForMillis;
+            // if delay or delay + current time are bigger than long max value
+            // second predicate for or condition uses the fact that long addition over the limit circles back
+            if ((time >= maxValue.doubleValue()) || ((retryTime + (long) time) < retryTime)) {
+                retryTime = maxValue;
+            } else {
+                retryTime += (long) time;
+            }
+        }
+        return retryTime;
+    }
+
+    private class RetryTimeComparator implements Serializable, Comparator<KinesisMessageId> {
+        @Override
+        public int compare(KinesisMessageId o1, KinesisMessageId o2) {
+            return retryTimes.get(o1).compareTo(retryTimes.get(o2));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
new file mode 100644
index 0000000..bb0e450
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import java.io.Serializable;
+
+public interface FailedMessageRetryHandler extends Serializable {
+    /**
+     * message with messageId failed in the spout
+     * @param messageId
+     * @return true if this failed message was scheduled to be retried, false otherwise
+     */
+    boolean failed (KinesisMessageId messageId);
+
+    /**
+     * message with messageId succeeded/acked in the spout
+     * @param messageId
+     */
+    void acked (KinesisMessageId messageId);
+
+    /**
+     * Get the next failed message's id to retry if any, null otherwise
+     * @return messageId
+     */
+    KinesisMessageId getNextFailedMessageToRetry ();
+
+    /**
+     * message with messageId returned by last call to getNextFailedMessageToRetry was emitted/retried by the spout
+     * @param messageId
+     */
+    void failedMessageEmitted (KinesisMessageId messageId);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
new file mode 100644
index 0000000..744aef5
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
+import java.io.Serializable;
+import java.util.Date;
+
+public class KinesisConfig implements Serializable {
+    // kinesis stream name to read from
+    private final String streamName;
+    // shard iterator type based on kinesis api - beginning of time, latest, at timestamp are only supported
+    private final ShardIteratorType shardIteratorType;
+    // implementation for converting a Kinesis record to a storm tuple
+    private final RecordToTupleMapper recordToTupleMapper;
+    // timestamp to be used for shardIteratorType AT_TIMESTAMP - can be null
+    private final Date timestamp;
+    // implementation for handling the failed messages retry logic
+    private final FailedMessageRetryHandler failedMessageRetryHandler;
+    // object capturing all zk related information for storing committed sequence numbers
+    private final ZkInfo zkInfo;
+    // object representing information on paramaters to use while connecting to kinesis using kinesis client
+    private final KinesisConnectionInfo kinesisConnectionInfo;
+    // this number represents the number of messages that are still not committed to zk. it will prevent the spout from emitting further.
+    // for e.g. if 1 failed and 2,3,4,5..... all have been acked by storm, they still cant be committed to zk because 1 is still in failed set. As a result
+    // the acked queue can infinitely grow without any of them being committed to zk. topology max pending does not help since from storm's view they are acked
+    private final Long maxUncommittedRecords;
+
+    public KinesisConfig(String streamName, ShardIteratorType shardIteratorType, RecordToTupleMapper recordToTupleMapper, Date timestamp, FailedMessageRetryHandler
+            failedMessageRetryHandler, ZkInfo zkInfo, KinesisConnectionInfo kinesisConnectionInfo, Long maxUncommittedRecords) {
+        this.streamName = streamName;
+        this.shardIteratorType = shardIteratorType;
+        this.recordToTupleMapper = recordToTupleMapper;
+        this.timestamp = timestamp;
+        this.failedMessageRetryHandler = failedMessageRetryHandler;
+        this.zkInfo = zkInfo;
+        this.kinesisConnectionInfo = kinesisConnectionInfo;
+        this.maxUncommittedRecords = maxUncommittedRecords;
+        validate();
+    }
+
+    private void validate () {
+        if (streamName == null || streamName.length() < 1) {
+            throw new IllegalArgumentException("streamName is required and cannot be of length 0.");
+        }
+        if (shardIteratorType == null || shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType
+                .AT_SEQUENCE_NUMBER)) {
+            throw new IllegalArgumentException("shardIteratorType has to be one of the " + ShardIteratorType.AT_TIMESTAMP + "," + ShardIteratorType.LATEST +
+                    "," + ShardIteratorType.TRIM_HORIZON);
+        }
+        if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP) && timestamp == null) {
+            throw new IllegalArgumentException("timestamp must be provided if shardIteratorType is " + ShardIteratorType.AT_TIMESTAMP);
+        }
+        if (recordToTupleMapper == null) {
+            throw new IllegalArgumentException("recordToTupleMapper cannot be null");
+        }
+        if (failedMessageRetryHandler == null) {
+            throw new IllegalArgumentException("failedMessageRetryHandler cannot be null");
+        }
+        if (zkInfo == null) {
+            throw new IllegalArgumentException("zkInfo cannot be null");
+        }
+        if (kinesisConnectionInfo == null) {
+            throw new IllegalArgumentException("kinesisConnectionInfo cannot be null");
+        }
+        if (maxUncommittedRecords == null || maxUncommittedRecords < 1) {
+            throw new IllegalArgumentException("maxUncommittedRecords has to be a positive integer");
+        }
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    public ShardIteratorType getShardIteratorType() {
+        return shardIteratorType;
+    }
+
+    public RecordToTupleMapper getRecordToTupleMapper() {
+        return recordToTupleMapper;
+    }
+
+    public Date getTimestamp() {
+        return timestamp;
+    }
+
+    public FailedMessageRetryHandler getFailedMessageRetryHandler () {
+        return failedMessageRetryHandler;
+    }
+
+    public ZkInfo getZkInfo () {
+        return zkInfo;
+    }
+
+    public KinesisConnectionInfo getKinesisConnectionInfo () {
+        return kinesisConnectionInfo;
+    }
+
+    public Long getMaxUncommittedRecords () {
+        return maxUncommittedRecords;
+    }
+
+    @Override
+    public String toString() {
+        return "KinesisConfig{" +
+                "streamName='" + streamName + '\'' +
+                ", shardIteratorType=" + shardIteratorType +
+                ", recordToTupleMapper=" + recordToTupleMapper +
+                ", timestamp=" + timestamp +
+                ", zkInfo=" + zkInfo +
+                ", kinesisConnectionInfo=" + kinesisConnectionInfo +
+                ", failedMessageRetryHandler =" + failedMessageRetryHandler +
+                ", maxUncommittedRecords=" + maxUncommittedRecords +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
new file mode 100644
index 0000000..dfd9049
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+class KinesisConnection {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
+    private final KinesisConnectionInfo kinesisConnectionInfo;
+    private AmazonKinesisClient kinesisClient;
+
+    KinesisConnection (KinesisConnectionInfo kinesisConnectionInfo) {
+        this.kinesisConnectionInfo = kinesisConnectionInfo;
+    }
+
+    void initialize () {
+        kinesisClient = new AmazonKinesisClient(kinesisConnectionInfo.getCredentialsProvider(), kinesisConnectionInfo.getClientConfiguration());
+        kinesisClient.setRegion(Region.getRegion(kinesisConnectionInfo.getRegion()));
+    }
+
+    List<Shard> getShardsForStream (String stream) {
+        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
+        describeStreamRequest.setStreamName(stream);
+        List<Shard> shards = new ArrayList<>();
+        String exclusiveStartShardId = null;
+        do {
+            describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
+            DescribeStreamResult describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
+            shards.addAll(describeStreamResult.getStreamDescription().getShards());
+            if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
+                exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
+            } else {
+                exclusiveStartShardId = null;
+            }
+        } while ( exclusiveStartShardId != null );
+        LOG.info("Number of shards for stream " + stream + " are " + shards.size());
+        return shards;
+    }
+
+    String getShardIterator (String stream, String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Date timestamp) {
+        String shardIterator = "";
+        try {
+            GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
+            getShardIteratorRequest.setStreamName(stream);
+            getShardIteratorRequest.setShardId(shardId);
+            getShardIteratorRequest.setShardIteratorType(shardIteratorType);
+            if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
+                getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
+            } else if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) {
+                getShardIteratorRequest.setTimestamp(timestamp);
+            }
+            GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
+            if (getShardIteratorResult != null) {
+                shardIterator = getShardIteratorResult.getShardIterator();
+            }
+        } catch (Exception e) {
+            LOG.warn("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " +
+                    sequenceNumber + " timestamp " + timestamp, e);
+        }
+        LOG.warn("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " +
+                sequenceNumber + " timestamp" + timestamp);
+        return shardIterator;
+    }
+
+    GetRecordsResult fetchRecords (String shardIterator) {
+        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
+        getRecordsRequest.setShardIterator(shardIterator);
+        getRecordsRequest.setLimit(kinesisConnectionInfo.getRecordsLimit());
+        GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
+        return getRecordsResult;
+    }
+
+    void shutdown () {
+        kinesisClient.shutdown();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
new file mode 100644
index 0000000..67ca29f
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class KinesisConnectionInfo implements Serializable {
+    private final byte[] serializedKinesisCredsProvider;
+    private final byte[] serializedkinesisClientConfig;
+    private final Integer recordsLimit;
+    private final Regions region;
+
+    private transient AWSCredentialsProvider credentialsProvider;
+    private transient ClientConfiguration clientConfiguration;
+
+    /**
+     *
+     * @param credentialsProvider implementation to provide credentials to connect to kinesis
+     * @param clientConfiguration client configuration to pass to kinesis client
+     * @param region region to connect to
+     * @param recordsLimit max records to be fetched in a getRecords request to kinesis
+     */
+    public KinesisConnectionInfo (AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, Regions region, Integer recordsLimit) {
+        if (recordsLimit == null || recordsLimit <= 0) {
+            throw new IllegalArgumentException("recordsLimit has to be a positive integer");
+        }
+        if (region == null) {
+            throw new IllegalArgumentException("region cannot be null");
+        }
+        serializedKinesisCredsProvider = getKryoSerializedBytes(credentialsProvider);
+        serializedkinesisClientConfig = getKryoSerializedBytes(clientConfiguration);
+        this.recordsLimit = recordsLimit;
+        this.region = region;
+    }
+
+    public Integer getRecordsLimit() {
+        return recordsLimit;
+    }
+
+    public AWSCredentialsProvider getCredentialsProvider() {
+        if (credentialsProvider == null) {
+            credentialsProvider = (AWSCredentialsProvider) this.getKryoDeserializedObject(serializedKinesisCredsProvider);
+        }
+        return credentialsProvider;
+    }
+
+    public ClientConfiguration getClientConfiguration() {
+        if (clientConfiguration == null) {
+            clientConfiguration = (ClientConfiguration) this.getKryoDeserializedObject(serializedkinesisClientConfig);
+        }
+        return clientConfiguration;
+    }
+
+    public Regions getRegion() {
+        return region;
+    }
+
+    private byte[] getKryoSerializedBytes (final Object obj) {
+        final Kryo kryo = new Kryo();
+        final ByteArrayOutputStream os = new ByteArrayOutputStream();
+        final Output output = new Output(os);
+        kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+        kryo.writeClassAndObject(output, obj);
+        output.flush();
+        return os.toByteArray();
+    }
+
+    private Object getKryoDeserializedObject (final byte[] ser) {
+        final Kryo kryo = new Kryo();
+        final Input input = new Input(new ByteArrayInputStream(ser));
+        kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+        return kryo.readClassAndObject(input);
+    }
+
+    @Override
+    public String toString() {
+        return "KinesisConnectionInfo{" +
+                "serializedKinesisCredsProvider=" + Arrays.toString(serializedKinesisCredsProvider) +
+                ", serializedkinesisClientConfig=" + Arrays.toString(serializedkinesisClientConfig) +
+                ", region=" + region +
+                ", recordsLimit=" + recordsLimit +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
new file mode 100644
index 0000000..dd239f1
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+public class KinesisMessageId {
+    private final String streamName;
+    private final String shardId;
+    private final String sequenceNumber;
+
+    KinesisMessageId (String streamName, String shardId, String sequenceNumber) {
+        this.streamName = streamName;
+        this.shardId = shardId;
+        this.sequenceNumber = sequenceNumber;
+    }
+
+    public String getStreamName () {
+        return streamName;
+    }
+
+    public String getShardId () {
+        return shardId;
+    }
+
+    public String getSequenceNumber () {
+        return sequenceNumber;
+    }
+
+    @Override
+    public String toString () {
+        return "KinesisMessageId{" +
+                "streamName='" + streamName + '\'' +
+                ", shardId='" + shardId + '\'' +
+                ", sequenceNumber='" + sequenceNumber + '\'' +
+                '}';
+    }
+
+    @Override
+    public boolean equals (Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        KinesisMessageId that = (KinesisMessageId) o;
+
+        if (streamName != null ? !streamName.equals(that.streamName) : that.streamName != null) return false;
+        if (shardId != null ? !shardId.equals(that.shardId) : that.shardId != null) return false;
+        return !(sequenceNumber != null ? !sequenceNumber.equals(that.sequenceNumber) : that.sequenceNumber != null);
+
+    }
+
+    @Override
+    public int hashCode () {
+        int result = streamName != null ? streamName.hashCode() : 0;
+        result = 31 * result + (shardId != null ? shardId.hashCode() : 0);
+        result = 31 * result + (sequenceNumber != null ? sequenceNumber.hashCode() : 0);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
new file mode 100644
index 0000000..7f3f024
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+class KinesisRecordsManager {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
+    // object handling zk interaction
+    private transient ZKConnection zkConnection;
+    // object handling interaction with kinesis
+    private transient KinesisConnection kinesisConnection;
+    // Kinesis Spout KinesisConfig object
+    private transient final KinesisConfig kinesisConfig;
+    // Queue of records per shard fetched from kinesis and are waiting to be emitted
+    private transient Map<String, LinkedList<Record>> toEmitPerShard = new HashMap<>();
+    // Map of records  that were fetched from kinesis as a result of failure and are waiting to be emitted
+    private transient Map<KinesisMessageId, Record> failedandFetchedRecords = new HashMap<>();
+    // Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the
+    // sequence number to commit. Logic explained in commit
+    private transient Map<String, TreeSet<BigInteger>> emittedPerShard = new HashMap<>();
+    // sorted acked sequence numbers - needed to figure out what sequence number can be committed
+    private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new HashMap<>();
+    // sorted failed sequence numbers - needed to figure out what sequence number can be committed
+    private transient Map<String, TreeSet<BigInteger>> failedPerShard = new HashMap<>();
+    // shard iterator corresponding to position in shard for new messages
+    private transient Map<String, String> shardIteratorPerShard = new HashMap<>();
+    // last fetched sequence number corresponding to position in shard
+    private transient Map<String, String> fetchedSequenceNumberPerShard = new HashMap<>();
+    // shard iterator corresponding to position in shard for failed messages
+    private transient Map<KinesisMessageId, String> shardIteratorPerFailedMessage = new HashMap<>();
+    // timestamp to decide when to commit to zk again
+    private transient long lastCommitTime;
+    // boolean to track deactivated state
+    private transient boolean deactivated;
+
+    KinesisRecordsManager (KinesisConfig kinesisConfig) {
+        this.kinesisConfig = kinesisConfig;
+        this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
+        this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
+    }
+
+    void initialize (int myTaskIndex, int totalTasks) {
+        deactivated = false;
+        lastCommitTime = System.currentTimeMillis();
+        kinesisConnection.initialize();
+        zkConnection.initialize();
+        List<Shard> shards = kinesisConnection.getShardsForStream(kinesisConfig.getStreamName());
+        LOG.info("myTaskIndex is " + myTaskIndex);
+        LOG.info("totalTasks is " + totalTasks);
+        int i = myTaskIndex;
+        while (i < shards.size()) {
+            LOG.info("Shard id " + shards.get(i).getShardId() + " assigned to task " + myTaskIndex);
+            toEmitPerShard.put(shards.get(i).getShardId(), new LinkedList<Record>());
+            i += totalTasks;
+        }
+        initializeFetchedSequenceNumbers();
+        refreshShardIteratorsForNewRecords();
+    }
+
+    void next (SpoutOutputCollector collector) {
+        if (shouldCommit()) {
+            commit();
+        }
+        KinesisMessageId failedMessageId = kinesisConfig.getFailedMessageRetryHandler().getNextFailedMessageToRetry();
+        if (failedMessageId  != null) {
+            // if the retry service returns a message that is not in failed set then ignore it. should never happen
+            BigInteger failedSequenceNumber = new BigInteger(failedMessageId.getSequenceNumber());
+            if (failedPerShard.containsKey(failedMessageId.getShardId()) && failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber)) {
+                if (!failedandFetchedRecords.containsKey(failedMessageId)) {
+                    fetchFailedRecords(failedMessageId);
+                }
+                if (emitFailedRecord(collector, failedMessageId)) {
+                    failedPerShard.get(failedMessageId.getShardId()).remove(failedSequenceNumber);
+                    kinesisConfig.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId);
+                    return;
+                } else {
+                    LOG.warn("failedMessageEmitted not called on retrier for " + failedMessageId + ". This can happen a few times but should not happen " +
+                            "infinitely");
+                }
+            } else {
+                LOG.warn("failedPerShard does not contain " + failedMessageId + ". This should never happen.");
+            }
+        }
+        LOG.debug("No failed record to emit for now. Hence will try to emit new records");
+        // if maximum uncommitted records count has reached, so dont emit any new records and return
+        if (!(getUncommittedRecordsCount() < kinesisConfig.getMaxUncommittedRecords())) {
+            LOG.warn("maximum uncommitted records count has reached. so not emitting any new records and returning");
+            return;
+        }
+        // early return as no shard is assigned - probably because number of executors > number of shards
+        if (toEmitPerShard.isEmpty()) {
+            LOG.warn("No shard is assigned to this task. Hence not emitting any tuple.");
+            return;
+        }
+
+        if (shouldFetchNewRecords()) {
+            fetchNewRecords();
+        }
+        emitNewRecord(collector);
+    }
+
+    void ack (KinesisMessageId kinesisMessageId) {
+        // for an acked message add it to acked set and remove it from emitted and failed
+        String shardId = kinesisMessageId.getShardId();
+        BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
+        LOG.debug("Ack received for shardId: " + shardId + " sequenceNumber: " + sequenceNumber);
+        // if an ack is received for a message then add it to the ackedPerShard TreeSet. TreeSet because while committing we need to figure out what is the
+        // highest sequence number that can be committed for this shard
+        if (!ackedPerShard.containsKey(shardId)) {
+            ackedPerShard.put(shardId, new TreeSet<BigInteger>());
+        }
+        ackedPerShard.get(shardId).add(sequenceNumber);
+        // if the acked message was in emittedPerShard that means we need to remove it from the emittedPerShard(which keeps track of in flight tuples)
+        if (emittedPerShard.containsKey(shardId)) {
+            TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
+            emitted.remove(sequenceNumber);
+        }
+        // an acked message should not be in failed since if it fails and gets re-emitted it moves to emittedPerShard from failedPerShard. Defensive coding.
+        // Remove it from failedPerShard anyway
+        if (failedPerShard.containsKey(shardId)) {
+            failedPerShard.get(shardId).remove(sequenceNumber);
+        }
+        // if an ack is for a message that failed once at least and was re-emitted then the record itself will be in failedAndFetchedRecords. We use that to
+        // determine if the FailedMessageRetryHandler needs to be told about it and then remove the record itself to clean up memory
+        if (failedandFetchedRecords.containsKey(kinesisMessageId)) {
+            kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId);
+            failedandFetchedRecords.remove(kinesisMessageId);
+        }
+        // keep committing when topology is deactivated since ack and fail keep getting called on deactivated topology
+        if (deactivated) {
+            commit();
+        }
+    }
+
+    void fail (KinesisMessageId kinesisMessageId) {
+        String shardId = kinesisMessageId.getShardId();
+        BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
+        LOG.debug("Fail received for shardId: " + shardId + " sequenceNumber: " + sequenceNumber);
+        // for a failed message add it to failed set if it will be retried, otherwise ack it; remove from emitted either way
+        if (kinesisConfig.getFailedMessageRetryHandler().failed(kinesisMessageId)) {
+            if (!failedPerShard.containsKey(shardId)) {
+                failedPerShard.put(shardId, new TreeSet<BigInteger>());
+            }
+            failedPerShard.get(shardId).add(sequenceNumber);
+            TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
+            emitted.remove(sequenceNumber);
+        } else {
+            ack(kinesisMessageId);
+        }
+        // keep committing when topology is deactivated since ack and fail keep getting called on deactivated topology
+        if (deactivated) {
+            commit();
+        }
+    }
+
+    void commit () {
+        // We have three mutually disjoint treesets per shard at any given time to keep track of what sequence number can be committed to zookeeper.
+        // emittedPerShard, ackedPerShard and failedPerShard. Any record starts by entering emittedPerShard. On ack it moves from emittedPerShard to
+        // ackedPerShard and on fail if retry service tells us to retry then it moves from emittedPerShard to failedPerShard. The failed records will move from
+        // failedPerShard to emittedPerShard when the failed record is emitted again as a retry.
+        // Logic for deciding what sequence number to commit is find the highest sequence number from ackedPerShard called X such that there is no sequence
+        // number Y in emittedPerShard or failedPerShard that satisfies X > Y. For e.g. if ackedPerShard is 1,4,5, emittedPerShard is 2,6 and
+        // failedPerShard is 3,7 then we can only commit 1 and not 4 because 2 is still pending and 3 has failed
+        for (String shardId: toEmitPerShard.keySet()) {
+            if (ackedPerShard.containsKey(shardId)) {
+                BigInteger commitSequenceNumberBound = null;
+                if (failedPerShard.containsKey(shardId) && !failedPerShard.get(shardId).isEmpty()) {
+                    commitSequenceNumberBound = failedPerShard.get(shardId).first();
+                }
+                if (emittedPerShard.containsKey(shardId) && !emittedPerShard.get(shardId).isEmpty()) {
+                    BigInteger smallestEmittedSequenceNumber = emittedPerShard.get(shardId).first();
+                    if (commitSequenceNumberBound == null || (commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1)) {
+                        commitSequenceNumberBound = smallestEmittedSequenceNumber;
+                    }
+                }
+                Iterator<BigInteger> ackedSequenceNumbers = ackedPerShard.get(shardId).iterator();
+                BigInteger ackedSequenceNumberToCommit = null;
+                while (ackedSequenceNumbers.hasNext()) {
+                    BigInteger ackedSequenceNumber = ackedSequenceNumbers.next();
+                    if (commitSequenceNumberBound == null || (commitSequenceNumberBound.compareTo(ackedSequenceNumber) == 1)) {
+                        ackedSequenceNumberToCommit = ackedSequenceNumber;
+                        ackedSequenceNumbers.remove();
+                    } else {
+                        break;
+                    }
+                }
+                if (ackedSequenceNumberToCommit != null) {
+                    Map<Object, Object> state = new HashMap<>();
+                    state.put("committedSequenceNumber", ackedSequenceNumberToCommit.toString());
+                    LOG.debug("Committing sequence number " + ackedSequenceNumberToCommit.toString() + " for shardId " + shardId);
+                    zkConnection.commitState(kinesisConfig.getStreamName(), shardId, state);
+                }
+            }
+        }
+        lastCommitTime = System.currentTimeMillis();
+    }
+
+    void activate () {
+        LOG.info("Activate called");
+        deactivated = false;
+        kinesisConnection.initialize();
+    }
+
+    void deactivate () {
+        LOG.info("Deactivate called");
+        deactivated = true;
+        commit();
+        kinesisConnection.shutdown();
+    }
+
+    void close () {
+        commit();
+        kinesisConnection.shutdown();
+        zkConnection.shutdown();
+    }
+
+    // fetch records from kinesis starting at sequence number for message passed as argument. Any other messages fetched and are in the failed queue will also
+    // be kept in memory to avoid going to kinesis again for retry
+    private void fetchFailedRecords (KinesisMessageId kinesisMessageId) {
+        // if shard iterator not present for this message, get it
+        if (!shardIteratorPerFailedMessage.containsKey(kinesisMessageId)) {
+            refreshShardIteratorForFailedRecord(kinesisMessageId);
+        }
+        String shardIterator = shardIteratorPerFailedMessage.get(kinesisMessageId);
+        LOG.debug("Fetching failed records for shard id :" + kinesisMessageId.getShardId() + " at sequence number " + kinesisMessageId.getSequenceNumber() +
+                " using shardIterator " + shardIterator);
+        try {
+            GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator);
+            if (getRecordsResult != null) {
+                List<Record> records = getRecordsResult.getRecords();
+                LOG.debug("Records size from fetchFailedRecords is " + records.size());
+                // update the shard iterator to next one in case this fetch does not give the message.
+                shardIteratorPerFailedMessage.put(kinesisMessageId, getRecordsResult.getNextShardIterator());
+                if (records.size() == 0) {
+                    LOG.warn("No records returned from kinesis. Hence sleeping for 1 second");
+                    Thread.sleep(1000);
+                } else {
+                    // add all fetched records to the set of failed records if they are present in failed set
+                    for (Record record: records) {
+                        KinesisMessageId current = new KinesisMessageId(kinesisMessageId.getStreamName(), kinesisMessageId.getShardId(), record.getSequenceNumber());
+                        if (failedPerShard.get(kinesisMessageId.getShardId()).contains(new BigInteger(current.getSequenceNumber()))) {
+                            failedandFetchedRecords.put(current, record);
+                            shardIteratorPerFailedMessage.remove(current);
+                        }
+                    }
+                }
+            }
+        } catch (InterruptedException ie) {
+            LOG.warn("Thread interrupted while sleeping", ie);
+        } catch (ExpiredIteratorException ex) {
+            LOG.warn("shardIterator for failedRecord " + kinesisMessageId + " has expired. Refreshing shardIterator");
+            refreshShardIteratorForFailedRecord(kinesisMessageId);
+        } catch (ProvisionedThroughputExceededException pe) {
+            try {
+                LOG.warn("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", pe);
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                LOG.warn("Thread interrupted exception", e);
+            }
+        }
+    }
+
+    private void fetchNewRecords () {
+        for (Map.Entry<String, LinkedList<Record>> entry : toEmitPerShard.entrySet()) {
+            String shardId = entry.getKey();
+            try {
+                String shardIterator = shardIteratorPerShard.get(shardId);
+                LOG.debug("Fetching new records for shard id :" + shardId + " using shardIterator " + shardIterator + " after sequence number " +
+                        fetchedSequenceNumberPerShard.get(shardId));
+                GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator);
+                if (getRecordsResult != null) {
+                    List<Record> records = getRecordsResult.getRecords();
+                    LOG.debug("Records size from fetchNewRecords is " + records.size());
+                    // update the shard iterator to next one in case this fetch does not give the message.
+                    shardIteratorPerShard.put(shardId, getRecordsResult.getNextShardIterator());
+                    if (records.size() == 0) {
+                        LOG.warn("No records returned from kinesis. Hence sleeping for 1 second");
+                        Thread.sleep(1000);
+                    } else {
+                        entry.getValue().addAll(records);
+                        fetchedSequenceNumberPerShard.put(shardId, records.get(records.size() - 1).getSequenceNumber());
+                    }
+                }
+            } catch (InterruptedException ie) {
+                LOG.warn("Thread interrupted while sleeping", ie);
+            } catch (ExpiredIteratorException ex) {
+                LOG.warn("shardIterator for shardId " + shardId + " has expired. Refreshing shardIterator");
+                refreshShardIteratorForNewRecords(shardId);
+            } catch (ProvisionedThroughputExceededException pe) {
+                try {
+                    LOG.warn("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", pe);
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    LOG.warn("Thread interrupted exception", e);
+                }
+            }
+        }
+    }
+
+    private void emitNewRecord (SpoutOutputCollector collector) {
+        for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
+            String shardId = entry.getKey();
+            LinkedList<Record> listOfRecords = entry.getValue();
+            Record record;
+            while ((record = listOfRecords.pollFirst()) != null) {
+                KinesisMessageId kinesisMessageId = new KinesisMessageId(kinesisConfig.getStreamName(), shardId, record.getSequenceNumber());
+                if (emitRecord(collector, record, kinesisMessageId)) {
+                   return;
+                }
+            }
+        }
+    }
+
+    private boolean emitFailedRecord (SpoutOutputCollector collector, KinesisMessageId kinesisMessageId) {
+        if (!failedandFetchedRecords.containsKey(kinesisMessageId)) {
+            return false;
+        }
+        return emitRecord(collector, failedandFetchedRecords.get(kinesisMessageId), kinesisMessageId);
+    }
+
+    private boolean emitRecord (SpoutOutputCollector collector, Record record, KinesisMessageId kinesisMessageId) {
+        boolean result = false;
+        List<Object> tuple = kinesisConfig.getRecordToTupleMapper().getTuple(record);
+        // if a record is returned put the sequence number in the emittedPerShard to tie back with ack or fail
+        if (tuple != null && tuple.size() > 0) {
+            collector.emit(tuple, kinesisMessageId);
+            if (!emittedPerShard.containsKey(kinesisMessageId.getShardId())) {
+                emittedPerShard.put(kinesisMessageId.getShardId(), new TreeSet<BigInteger>());
+            }
+            emittedPerShard.get(kinesisMessageId.getShardId()).add(new BigInteger(record.getSequenceNumber()));
+            result = true;
+        } else {
+            // ack to not process the record again on restart and move on to next message
+            LOG.warn("Record " + record + " did not return a tuple to emit. Hence acking it");
+            ack(kinesisMessageId);
+        }
+        return result;
+    }
+
+    private boolean shouldCommit () {
+        return (System.currentTimeMillis() - lastCommitTime >= kinesisConfig.getZkInfo().getCommitIntervalMs());
+    }
+
+    private void initializeFetchedSequenceNumbers () {
+        for (String shardId : toEmitPerShard.keySet()) {
+            Map<Object, Object> state = zkConnection.readState(kinesisConfig.getStreamName(), shardId);
+            // if state found for this shard in zk, then set the sequence number in fetchedSequenceNumber
+            if (state != null) {
+                Object committedSequenceNumber = state.get("committedSequenceNumber");
+                LOG.info("State read is committedSequenceNumber: " + committedSequenceNumber + " shardId:" + shardId);
+                if (committedSequenceNumber != null) {
+                    fetchedSequenceNumberPerShard.put(shardId, (String) committedSequenceNumber);
+                }
+            }
+        }
+    }
+
+    private void refreshShardIteratorsForNewRecords () {
+        for (String shardId: toEmitPerShard.keySet()) {
+            refreshShardIteratorForNewRecords(shardId);
+        }
+    }
+
+    private void refreshShardIteratorForNewRecords (String shardId) {
+        String shardIterator = null;
+        String lastFetchedSequenceNumber = fetchedSequenceNumberPerShard.get(shardId);
+        ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null ? kinesisConfig.getShardIteratorType() : ShardIteratorType
+                .AFTER_SEQUENCE_NUMBER);
+        // Set the shard iterator for last fetched sequence number to start from correct position in shard
+        shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), shardId, shardIteratorType, lastFetchedSequenceNumber, kinesisConfig
+                .getTimestamp());
+        if (shardIterator != null && !shardIterator.isEmpty()) {
+            LOG.warn("Refreshing shard iterator for new records for shardId " + shardId + " with shardIterator " + shardIterator);
+            shardIteratorPerShard.put(shardId, shardIterator);
+        }
+    }
+
+    private void refreshShardIteratorForFailedRecord (KinesisMessageId kinesisMessageId) {
+        String shardIterator = null;
+        // Set the shard iterator for last fetched sequence number to start from correct position in shard
+        shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), kinesisMessageId.getShardId(), ShardIteratorType
+                .AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
+        if (shardIterator != null && !shardIterator.isEmpty()) {
+            LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId + " with shardIterator " + shardIterator);
+            shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator);
+        }
+    }
+
+    private Long getUncommittedRecordsCount () {
+        Long result = 0L;
+        for (Map.Entry<String, TreeSet<BigInteger>> emitted: emittedPerShard.entrySet()) {
+            result += emitted.getValue().size();
+        }
+        for (Map.Entry<String, TreeSet<BigInteger>> acked: ackedPerShard.entrySet()) {
+            result += acked.getValue().size();
+        }
+        for (Map.Entry<String, TreeSet<BigInteger>> failed: failedPerShard.entrySet()) {
+            result += failed.getValue().size();
+        }
+        LOG.debug("Returning uncommittedRecordsCount as " + result);
+        return result;
+    }
+
+    private boolean shouldFetchNewRecords () {
+        // check to see if any shard has already fetched records waiting to be emitted, in which case dont fetch more
+        boolean fetchRecords = true;
+        for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
+            if (!entry.getValue().isEmpty()) {
+                fetchRecords = false;
+                break;
+            }
+        }
+        return fetchRecords;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
new file mode 100644
index 0000000..500195b
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+
+import java.util.Map;
+
+public class KinesisSpout extends BaseRichSpout {
+
+    private final KinesisConfig kinesisConfig;
+    private transient KinesisRecordsManager kinesisRecordsManager;
+    private transient SpoutOutputCollector collector;
+
+    public KinesisSpout (KinesisConfig kinesisConfig) {
+        this.kinesisConfig = kinesisConfig;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(kinesisConfig.getRecordToTupleMapper().getOutputFields());
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration () {
+        return super.getComponentConfiguration();
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+        kinesisRecordsManager = new KinesisRecordsManager(kinesisConfig);
+        kinesisRecordsManager.initialize(context.getThisTaskIndex(), context.getComponentTasks(context.getThisComponentId()).size());
+    }
+
+    @Override
+    public void close() {
+        kinesisRecordsManager.close();
+    }
+
+    @Override
+    public void activate() {
+        kinesisRecordsManager.activate();
+    }
+
+    @Override
+    public void deactivate() {
+        kinesisRecordsManager.deactivate();
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        kinesisRecordsManager.ack((KinesisMessageId) msgId);
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        kinesisRecordsManager.fail((KinesisMessageId) msgId);
+    }
+
+    @Override
+    public void nextTuple() {
+        kinesisRecordsManager.next(collector);
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
new file mode 100644
index 0000000..c806539
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.Record;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+
+public interface RecordToTupleMapper {
+    /**
+     *
+     * @return names of fields in the emitted tuple
+     */
+    Fields getOutputFields ();
+
+    /**
+     *
+     * @param record kinesis record
+     * @return storm tuple to be emitted for this record, null if no tuple should be emitted
+     */
+    List<Object> getTuple (Record record);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
new file mode 100644
index 0000000..41151d1
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.zookeeper.CreateMode;
+import org.json.simple.JSONValue;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+class ZKConnection {
+
+    private final ZkInfo zkInfo;
+    private CuratorFramework curatorFramework;
+
+    ZKConnection (ZkInfo zkInfo) {
+        this.zkInfo = zkInfo;
+    }
+
+    void initialize () {
+        curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), zkInfo.getSessionTimeoutMs(), zkInfo.getConnectionTimeoutMs(), new
+                RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
+        curatorFramework.start();
+    }
+
+    void commitState (String stream, String shardId, Map<Object, Object> state) {
+        byte[] bytes = JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8"));
+        try {
+            String path = getZkPath(stream, shardId);
+            if (curatorFramework.checkExists().forPath(path) == null) {
+                curatorFramework.create()
+                        .creatingParentsIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, bytes);
+            } else {
+                curatorFramework.setData().forPath(path, bytes);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    Map<Object, Object> readState (String stream, String shardId) {
+        try {
+            String path = getZkPath(stream, shardId);
+            Map<Object, Object> state = null;
+            byte[] b = null;
+            if (curatorFramework.checkExists().forPath(path) != null) {
+                b = curatorFramework.getData().forPath(path);
+            }
+            if (b != null) {
+                state = (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
+            }
+            return state;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    void shutdown () {
+        curatorFramework.close();
+    }
+
+    private String getZkPath (String stream, String shardId) {
+        String path = "";
+        if (!zkInfo.getZkNode().startsWith("/")) {
+            path += "/";
+        }
+        path += zkInfo.getZkNode();
+        if (!zkInfo.getZkNode().endsWith("/")) {
+            path += "/";
+        }
+        path += (stream + "/" + shardId);
+        return path;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
new file mode 100644
index 0000000..a47f0ab
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import java.io.Serializable;
+
+public class ZkInfo implements Serializable {
+    // comma separated list of zk connect strings to connect to zookeeper e.g. localhost:2181
+    private final String zkUrl;
+    // zk node under which to commit the sequence number of messages. e.g. /committed_sequence_numbers
+    private final String zkNode;
+    // zk session timeout in milliseconds
+    private final Integer sessionTimeoutMs;
+    // zk connection timeout in milliseconds
+    private final Integer connectionTimeoutMs;
+    // interval at which to commit offsets to zk in milliseconds
+    private final Long commitIntervalMs;
+    // number of retry attempts for zk
+    private final Integer retryAttempts;
+    // time to sleep between retries in milliseconds
+    private final Integer retryIntervalMs;
+
+    public ZkInfo (String zkUrl, String zkNode, Integer sessionTimeoutMs, Integer connectionTimeoutMs, Long commitIntervalMs, Integer retryAttempts, Integer
+            retryIntervalMs) {
+        this.zkUrl = zkUrl;
+        this.zkNode = zkNode;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.connectionTimeoutMs = connectionTimeoutMs;
+        this.commitIntervalMs = commitIntervalMs;
+        this.retryAttempts = retryAttempts;
+        this.retryIntervalMs = retryIntervalMs;
+        validate();
+    }
+
+    public String getZkUrl() {
+        return zkUrl;
+    }
+
+    public String getZkNode() {
+        return zkNode;
+    }
+
+    public Integer getSessionTimeoutMs() {
+        return sessionTimeoutMs;
+    }
+
+    public Integer getConnectionTimeoutMs() {
+        return connectionTimeoutMs;
+    }
+
+    public Long getCommitIntervalMs() {
+        return commitIntervalMs;
+    }
+
+    public Integer getRetryAttempts() {
+        return retryAttempts;
+    }
+
+    public Integer getRetryIntervalMs() {
+        return retryIntervalMs;
+    }
+
+    private void validate () {
+
+        if (zkUrl == null || zkUrl.length() < 1) {
+            throw new IllegalArgumentException("zkUrl must be specified to connect to zookeeper");
+        }
+        if (zkNode == null || zkNode.length() < 1) {
+            throw new IllegalArgumentException("zkNode must be specified");
+        }
+        checkPositive(sessionTimeoutMs, "sessionTimeoutMs");
+        checkPositive(connectionTimeoutMs, "connectionTimeoutMs");
+        checkPositive(commitIntervalMs, "commitIntervalMs");
+        checkPositive(retryAttempts, "retryAttempts");
+        checkPositive(retryIntervalMs, "retryIntervalMs");
+    }
+
+    private void checkPositive (Integer argument, String name) {
+        if (argument == null && argument <= 0) {
+            throw new IllegalArgumentException(name + " must be positive");
+        }
+    }
+    private void checkPositive (Long argument, String name) {
+        if (argument == null && argument <= 0) {
+            throw new IllegalArgumentException(name + " must be positive");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ZkInfo{" +
+                "zkUrl='" + zkUrl + '\'' +
+                ", zkNode='" + zkNode + '\'' +
+                ", sessionTimeoutMs=" + sessionTimeoutMs +
+                ", connectionTimeoutMs=" + connectionTimeoutMs +
+                ", commitIntervalMs=" + commitIntervalMs +
+                ", retryAttempts=" + retryAttempts +
+                ", retryIntervalMs=" + retryIntervalMs +
+                '}';
+    }
+
+}