You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/08/03 04:58:18 UTC

[1/6] storm git commit: STORM-1839: Storm spout implementation for Amazon Kinesis Streams.

Repository: storm
Updated Branches:
  refs/heads/master 8dd7be08e -> 4abd6e926


http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
new file mode 100644
index 0000000..eaa0246
--- /dev/null
+++ b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
@@ -0,0 +1,32 @@
+package org.apache.storm.kinesis.spout.test;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class KinesisBoltTest extends BaseRichBolt {
+    protected static final Logger LOG = LoggerFactory.getLogger(KinesisBoltTest.class);
+    private transient OutputCollector collector;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        LOG.debug("input = [" + input + "]");
+        System.out.println("input = [" + input + "]");
+        collector.ack(input);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
new file mode 100644
index 0000000..d8fca64
--- /dev/null
+++ b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
@@ -0,0 +1,38 @@
+package org.apache.storm.kinesis.spout.test;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.kinesis.spout.CredentialsProviderChain;
+import org.apache.storm.kinesis.spout.ExponentialBackoffRetrier;
+import org.apache.storm.kinesis.spout.KinesisConnectionInfo;
+import org.apache.storm.kinesis.spout.KinesisSpout;
+import org.apache.storm.kinesis.spout.RecordToTupleMapper;
+import org.apache.storm.kinesis.spout.ZkInfo;
+import org.apache.storm.topology.TopologyBuilder;
+
+import java.util.Date;
+
+public class KinesisSpoutTopology {
+    public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
+        String topologyName = args[0];
+        RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
+        KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2,
+                1000);
+        org.apache.storm.kinesis.spout.Config config = new org.apache.storm.kinesis.spout.Config(args[1], ShardIteratorType.TRIM_HORIZON,
+                recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), new ZkInfo(), kinesisConnectionInfo, 10000L);
+        KinesisSpout kinesisSpout = new KinesisSpout(config);
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        topologyBuilder.setSpout("spout", kinesisSpout, 3);
+        topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");
+        Config topologyConfig = new Config();
+        topologyConfig.setDebug(true);
+        topologyConfig.setNumWorkers(3);
+        StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
new file mode 100644
index 0000000..f4662b9
--- /dev/null
+++ b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
@@ -0,0 +1,42 @@
+package org.apache.storm.kinesis.spout.test;
+
+import com.amazonaws.services.kinesis.model.Record;
+import org.apache.storm.kinesis.spout.RecordToTupleMapper;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestRecordToTupleMapper implements RecordToTupleMapper, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(TestRecordToTupleMapper.class);
+    @Override
+    public Fields getOutputFields() {
+        return new Fields("partitionKey", "sequenceNumber", "data");
+    }
+
+    @Override
+    public List<Object> getTuple(Record record) {
+        CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
+        List<Object> tuple = new ArrayList<>();
+        tuple.add(record.getPartitionKey());
+        tuple.add(record.getSequenceNumber());
+        try {
+            System.out.println("bytebuffer is " + record.getData());
+            String data = decoder.decode(record.getData()).toString();
+            System.out.println("data is " + data);
+            tuple.add(data);
+        } catch (CharacterCodingException e) {
+            e.printStackTrace();
+            System.out.println("Exception occured. Emitting tuple with empty string data");
+            tuple.add("");
+        }
+        System.out.println("Tuple from record is " + tuple);
+        return tuple;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 31cbe97..3eed0df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -296,6 +296,7 @@
         <module>external/storm-kafka-client</module>
         <module>external/storm-opentsdb</module>
         <module>external/storm-kafka-monitor</module>
+        <module>external/storm-kinesis</module>
     </modules>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 048e016..fcbd290 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -107,6 +107,20 @@
             </includes>
         </fileSet>
         <fileSet>
+            <directory>${project.basedir}/../../external/storm-kinesis/target</directory>
+            <outputDirectory>external/storm-kinesis</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-kinesis</directory>
+            <outputDirectory>external/storm-kinesis</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
+        <fileSet>
             <directory>${project.basedir}/../../external/storm-hdfs/target</directory>
             <outputDirectory>external/storm-hdfs</outputDirectory>
             <includes>


[5/6] storm git commit: Merge branch 'STORM-1839' of https://github.com/priyank5485/storm into STORM-1839

Posted by sr...@apache.org.
Merge branch 'STORM-1839' of https://github.com/priyank5485/storm into STORM-1839


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b23958c4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b23958c4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b23958c4

Branch: refs/heads/master
Commit: b23958c4df1f0ce4b7869cafc083da5ed139c65e
Parents: 8dd7be0 b59c75c
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Aug 2 21:35:10 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Aug 2 21:35:10 2016 -0700

----------------------------------------------------------------------
 external/storm-kinesis/README.md                | 140 ++++++
 external/storm-kinesis/pom.xml                  |  69 +++
 .../kinesis/spout/CredentialsProviderChain.java |  39 ++
 .../spout/ExponentialBackoffRetrier.java        | 164 +++++++
 .../spout/FailedMessageRetryHandler.java        |  48 ++
 .../storm/kinesis/spout/KinesisConfig.java      | 133 ++++++
 .../storm/kinesis/spout/KinesisConnection.java  | 108 +++++
 .../kinesis/spout/KinesisConnectionInfo.java    | 111 +++++
 .../storm/kinesis/spout/KinesisMessageId.java   |  73 +++
 .../kinesis/spout/KinesisRecordsManager.java    | 449 +++++++++++++++++++
 .../storm/kinesis/spout/KinesisSpout.java       |  86 ++++
 .../kinesis/spout/RecordToTupleMapper.java      |  38 ++
 .../storm/kinesis/spout/ZKConnection.java       |  95 ++++
 .../org/apache/storm/kinesis/spout/ZkInfo.java  | 118 +++++
 .../kinesis/spout/test/KinesisBoltTest.java     |  31 ++
 .../spout/test/KinesisSpoutTopology.java        |  40 ++
 .../spout/test/TestRecordToTupleMapper.java     |  41 ++
 pom.xml                                         |   1 +
 storm-dist/binary/src/main/assembly/binary.xml  |  14 +
 19 files changed, 1798 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b23958c4/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/b23958c4/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------


[6/6] storm git commit: Added STORM-1839 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1839 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4abd6e92
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4abd6e92
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4abd6e92

Branch: refs/heads/master
Commit: 4abd6e9261ecc463fb551c059833397dd3d61208
Parents: b23958c
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Aug 2 21:57:28 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Aug 2 21:57:28 2016 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4abd6e92/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4562eb2..80f3857 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -131,6 +131,7 @@
  * STORM-1769: Added a test to check local nimbus with notifier plugin
 
 ## 1.1.0 
+ * STORM-1839: Storm spout implementation for Amazon Kinesis Streams.
  * STORM-1988: Kafka Offset not showing due to bad classpath.
  * STORM-1987: Fix TridentKafkaWordCount arg handling in distributed mode.
  * STORM-1969: Modify HiveTopology to show usage of non-partition table.


[4/6] storm git commit: STORM-1839: Move zk and kinesis interaction in to its own class.

Posted by sr...@apache.org.
STORM-1839: Move zk and kinesis interaction in to its own class.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b59c75c0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b59c75c0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b59c75c0

Branch: refs/heads/master
Commit: b59c75c052ee73203775d9acc2cecb7c691de940
Parents: 49c4747
Author: Priyank <ps...@hortonworks.com>
Authored: Mon Aug 1 12:19:23 2016 -0700
Committer: Priyank <ps...@hortonworks.com>
Committed: Mon Aug 1 12:19:23 2016 -0700

----------------------------------------------------------------------
 .../storm/kinesis/spout/KinesisConnection.java  | 108 +++++++++++
 .../kinesis/spout/KinesisRecordsManager.java    | 187 ++++---------------
 .../storm/kinesis/spout/ZKConnection.java       |  95 ++++++++++
 3 files changed, 238 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b59c75c0/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
new file mode 100644
index 0000000..dfd9049
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+class KinesisConnection {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
+    private final KinesisConnectionInfo kinesisConnectionInfo;
+    private AmazonKinesisClient kinesisClient;
+
+    KinesisConnection (KinesisConnectionInfo kinesisConnectionInfo) {
+        this.kinesisConnectionInfo = kinesisConnectionInfo;
+    }
+
+    void initialize () {
+        kinesisClient = new AmazonKinesisClient(kinesisConnectionInfo.getCredentialsProvider(), kinesisConnectionInfo.getClientConfiguration());
+        kinesisClient.setRegion(Region.getRegion(kinesisConnectionInfo.getRegion()));
+    }
+
+    List<Shard> getShardsForStream (String stream) {
+        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
+        describeStreamRequest.setStreamName(stream);
+        List<Shard> shards = new ArrayList<>();
+        String exclusiveStartShardId = null;
+        do {
+            describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
+            DescribeStreamResult describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
+            shards.addAll(describeStreamResult.getStreamDescription().getShards());
+            if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
+                exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
+            } else {
+                exclusiveStartShardId = null;
+            }
+        } while ( exclusiveStartShardId != null );
+        LOG.info("Number of shards for stream " + stream + " are " + shards.size());
+        return shards;
+    }
+
+    String getShardIterator (String stream, String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Date timestamp) {
+        String shardIterator = "";
+        try {
+            GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
+            getShardIteratorRequest.setStreamName(stream);
+            getShardIteratorRequest.setShardId(shardId);
+            getShardIteratorRequest.setShardIteratorType(shardIteratorType);
+            if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
+                getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
+            } else if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) {
+                getShardIteratorRequest.setTimestamp(timestamp);
+            }
+            GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
+            if (getShardIteratorResult != null) {
+                shardIterator = getShardIteratorResult.getShardIterator();
+            }
+        } catch (Exception e) {
+            LOG.warn("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " +
+                    sequenceNumber + " timestamp " + timestamp, e);
+        }
+        LOG.warn("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " +
+                sequenceNumber + " timestamp" + timestamp);
+        return shardIterator;
+    }
+
+    GetRecordsResult fetchRecords (String shardIterator) {
+        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
+        getRecordsRequest.setShardIterator(shardIterator);
+        getRecordsRequest.setLimit(kinesisConnectionInfo.getRecordsLimit());
+        GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
+        return getRecordsResult;
+    }
+
+    void shutdown () {
+        kinesisClient.shutdown();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b59c75c0/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
index bdd054f..7f3f024 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
@@ -18,32 +18,17 @@
 
 package org.apache.storm.kinesis.spout;
 
-import com.amazonaws.regions.Region;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
-import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
 import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.zookeeper.CreateMode;
-import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -53,8 +38,10 @@ import java.util.TreeSet;
 
 class KinesisRecordsManager {
     private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
-    // zk interaction object
-    private transient CuratorFramework curatorFramework;
+    // object handling zk interaction
+    private transient ZKConnection zkConnection;
+    // object handling interaction with kinesis
+    private transient KinesisConnection kinesisConnection;
     // Kinesis Spout KinesisConfig object
     private transient final KinesisConfig kinesisConfig;
     // Queue of records per shard fetched from kinesis and are waiting to be emitted
@@ -78,18 +65,19 @@ class KinesisRecordsManager {
     private transient long lastCommitTime;
     // boolean to track deactivated state
     private transient boolean deactivated;
-    private transient AmazonKinesisClient kinesisClient;
 
     KinesisRecordsManager (KinesisConfig kinesisConfig) {
         this.kinesisConfig = kinesisConfig;
+        this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
+        this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
     }
 
     void initialize (int myTaskIndex, int totalTasks) {
         deactivated = false;
         lastCommitTime = System.currentTimeMillis();
-        initializeKinesisClient();
-        initializeCurator();
-        List<Shard> shards = this.getShards();
+        kinesisConnection.initialize();
+        zkConnection.initialize();
+        List<Shard> shards = kinesisConnection.getShardsForStream(kinesisConfig.getStreamName());
         LOG.info("myTaskIndex is " + myTaskIndex);
         LOG.info("totalTasks is " + totalTasks);
         int i = myTaskIndex;
@@ -149,17 +137,24 @@ class KinesisRecordsManager {
         String shardId = kinesisMessageId.getShardId();
         BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
         LOG.debug("Ack received for shardId: " + shardId + " sequenceNumber: " + sequenceNumber);
+        // if an ack is received for a message then add it to the ackedPerShard TreeSet. TreeSet because while committing we need to figure out what is the
+        // highest sequence number that can be committed for this shard
         if (!ackedPerShard.containsKey(shardId)) {
             ackedPerShard.put(shardId, new TreeSet<BigInteger>());
         }
         ackedPerShard.get(shardId).add(sequenceNumber);
+        // if the acked message was in emittedPerShard that means we need to remove it from the emittedPerShard(which keeps track of in flight tuples)
         if (emittedPerShard.containsKey(shardId)) {
             TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
             emitted.remove(sequenceNumber);
         }
+        // an acked message should not be in failed since if it fails and gets re-emitted it moves to emittedPerShard from failedPerShard. Defensive coding.
+        // Remove it from failedPerShard anyway
         if (failedPerShard.containsKey(shardId)) {
             failedPerShard.get(shardId).remove(sequenceNumber);
         }
+        // if an ack is for a message that failed once at least and was re-emitted then the record itself will be in failedAndFetchedRecords. We use that to
+        // determine if the FailedMessageRetryHandler needs to be told about it and then remove the record itself to clean up memory
         if (failedandFetchedRecords.containsKey(kinesisMessageId)) {
             kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId);
             failedandFetchedRecords.remove(kinesisMessageId);
@@ -192,8 +187,13 @@ class KinesisRecordsManager {
     }
 
     void commit () {
-        // Logic for deciding what sequence number to ack is find the highest sequence number from acked called X such that there is no sequence number Y in
-        // emitted or failed that satisfies X > Y. For e.g. is acked is 1,3,5. Emitted is 2,4,6 then we can only commit 1 and not 3 because 2 is still pending
+        // We have three mutually disjoint treesets per shard at any given time to keep track of what sequence number can be committed to zookeeper.
+        // emittedPerShard, ackedPerShard and failedPerShard. Any record starts by entering emittedPerShard. On ack it moves from emittedPerShard to
+        // ackedPerShard and on fail if retry service tells us to retry then it moves from emittedPerShard to failedPerShard. The failed records will move from
+        // failedPerShard to emittedPerShard when the failed record is emitted again as a retry.
+        // Logic for deciding what sequence number to commit is find the highest sequence number from ackedPerShard called X such that there is no sequence
+        // number Y in emittedPerShard or failedPerShard that satisfies X > Y. For e.g. if ackedPerShard is 1,4,5, emittedPerShard is 2,6 and
+        // failedPerShard is 3,7 then we can only commit 1 and not 4 because 2 is still pending and 3 has failed
         for (String shardId: toEmitPerShard.keySet()) {
             if (ackedPerShard.containsKey(shardId)) {
                 BigInteger commitSequenceNumberBound = null;
@@ -221,8 +221,7 @@ class KinesisRecordsManager {
                     Map<Object, Object> state = new HashMap<>();
                     state.put("committedSequenceNumber", ackedSequenceNumberToCommit.toString());
                     LOG.debug("Committing sequence number " + ackedSequenceNumberToCommit.toString() + " for shardId " + shardId);
-                    String path = getZkPath(shardId);
-                    commitState(path, state);
+                    zkConnection.commitState(kinesisConfig.getStreamName(), shardId, state);
                 }
             }
         }
@@ -232,65 +231,20 @@ class KinesisRecordsManager {
     void activate () {
         LOG.info("Activate called");
         deactivated = false;
-        initializeKinesisClient();
+        kinesisConnection.initialize();
     }
 
     void deactivate () {
         LOG.info("Deactivate called");
         deactivated = true;
         commit();
-        shutdownKinesisClient();
+        kinesisConnection.shutdown();
     }
 
     void close () {
         commit();
-        shutdownKinesisClient();
-        shutdownCurator();
-    }
-
-    private String getZkPath (String shardId) {
-        String path = "";
-        if (!kinesisConfig.getZkInfo().getZkNode().startsWith("/")) {
-            path += "/";
-        }
-        path += kinesisConfig.getZkInfo().getZkNode();
-        if (!kinesisConfig.getZkInfo().getZkNode().endsWith("/")) {
-            path += "/";
-        }
-        path += (kinesisConfig.getStreamName() + "/" + shardId);
-        return path;
-    }
-
-    private void commitState (String path, Map<Object, Object> state) {
-        byte[] bytes = JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8"));
-        try {
-            if (curatorFramework.checkExists().forPath(path) == null) {
-                curatorFramework.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path, bytes);
-            } else {
-                curatorFramework.setData().forPath(path, bytes);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private Map<Object, Object> readState (String path) {
-        try {
-            Map<Object, Object> state = null;
-            byte[] b = null;
-            if (curatorFramework.checkExists().forPath(path) != null) {
-                b = curatorFramework.getData().forPath(path);
-            }
-            if (b != null) {
-                state = (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
-            }
-            return state;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+        kinesisConnection.shutdown();
+        zkConnection.shutdown();
     }
 
     // fetch records from kinesis starting at sequence number for message passed as argument. Any other messages fetched and are in the failed queue will also
@@ -304,7 +258,7 @@ class KinesisRecordsManager {
         LOG.debug("Fetching failed records for shard id :" + kinesisMessageId.getShardId() + " at sequence number " + kinesisMessageId.getSequenceNumber() +
                 " using shardIterator " + shardIterator);
         try {
-            GetRecordsResult getRecordsResult = fetchRecords(shardIterator);
+            GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator);
             if (getRecordsResult != null) {
                 List<Record> records = getRecordsResult.getRecords();
                 LOG.debug("Records size from fetchFailedRecords is " + records.size());
@@ -346,7 +300,7 @@ class KinesisRecordsManager {
                 String shardIterator = shardIteratorPerShard.get(shardId);
                 LOG.debug("Fetching new records for shard id :" + shardId + " using shardIterator " + shardIterator + " after sequence number " +
                         fetchedSequenceNumberPerShard.get(shardId));
-                GetRecordsResult getRecordsResult = fetchRecords(shardIterator);
+                GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator);
                 if (getRecordsResult != null) {
                     List<Record> records = getRecordsResult.getRecords();
                     LOG.debug("Records size from fetchNewRecords is " + records.size());
@@ -376,34 +330,6 @@ class KinesisRecordsManager {
         }
     }
 
-    private GetRecordsResult fetchRecords (String shardIterator) {
-        List<Record> records = new ArrayList<>();
-        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
-        getRecordsRequest.setShardIterator(shardIterator);
-        getRecordsRequest.setLimit(kinesisConfig.getKinesisConnectionInfo().getRecordsLimit());
-        GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
-        return getRecordsResult;
-    }
-
-    private List<Shard> getShards () {
-        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
-        describeStreamRequest.setStreamName(kinesisConfig.getStreamName());
-        List<Shard> shards = new ArrayList<>();
-        String exclusiveStartShardId = null;
-        do {
-            describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
-            DescribeStreamResult describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
-            shards.addAll(describeStreamResult.getStreamDescription().getShards());
-            if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
-                exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
-            } else {
-                exclusiveStartShardId = null;
-            }
-        } while ( exclusiveStartShardId != null );
-        LOG.info("Number of shards for stream " + kinesisConfig.getStreamName() + " are " + shards.size());
-        return shards;
-    }
-
     private void emitNewRecord (SpoutOutputCollector collector) {
         for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
             String shardId = entry.getKey();
@@ -450,7 +376,7 @@ class KinesisRecordsManager {
 
     private void initializeFetchedSequenceNumbers () {
         for (String shardId : toEmitPerShard.keySet()) {
-            Map<Object, Object> state = readState(getZkPath(shardId));
+            Map<Object, Object> state = zkConnection.readState(kinesisConfig.getStreamName(), shardId);
             // if state found for this shard in zk, then set the sequence number in fetchedSequenceNumber
             if (state != null) {
                 Object committedSequenceNumber = state.get("committedSequenceNumber");
@@ -474,7 +400,8 @@ class KinesisRecordsManager {
         ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null ? kinesisConfig.getShardIteratorType() : ShardIteratorType
                 .AFTER_SEQUENCE_NUMBER);
         // Set the shard iterator for last fetched sequence number to start from correct position in shard
-        shardIterator = this.getShardIterator(shardId, shardIteratorType, lastFetchedSequenceNumber, kinesisConfig.getTimestamp());
+        shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), shardId, shardIteratorType, lastFetchedSequenceNumber, kinesisConfig
+                .getTimestamp());
         if (shardIterator != null && !shardIterator.isEmpty()) {
             LOG.warn("Refreshing shard iterator for new records for shardId " + shardId + " with shardIterator " + shardIterator);
             shardIteratorPerShard.put(shardId, shardIterator);
@@ -484,38 +411,14 @@ class KinesisRecordsManager {
     private void refreshShardIteratorForFailedRecord (KinesisMessageId kinesisMessageId) {
         String shardIterator = null;
         // Set the shard iterator for last fetched sequence number to start from correct position in shard
-        shardIterator = this.getShardIterator(kinesisMessageId.getShardId(), ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
+        shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), kinesisMessageId.getShardId(), ShardIteratorType
+                .AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
         if (shardIterator != null && !shardIterator.isEmpty()) {
             LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId + " with shardIterator " + shardIterator);
             shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator);
         }
     }
 
-    private String getShardIterator (String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Date timestamp) {
-        String shardIterator = "";
-        try {
-            GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
-            getShardIteratorRequest.setStreamName(kinesisConfig.getStreamName());
-            getShardIteratorRequest.setShardId(shardId);
-            getShardIteratorRequest.setShardIteratorType(shardIteratorType);
-            if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
-                getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
-            } else if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) {
-                getShardIteratorRequest.setTimestamp(timestamp);
-            }
-            GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
-            if (getShardIteratorResult != null) {
-                shardIterator = getShardIteratorResult.getShardIterator();
-            }
-        } catch (Exception e) {
-            LOG.warn("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " +
-                    sequenceNumber + " timestamp " + timestamp, e);
-        }
-        LOG.warn("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " +
-                sequenceNumber + " timestamp" + timestamp);
-        return shardIterator;
-    }
-
     private Long getUncommittedRecordsCount () {
         Long result = 0L;
         for (Map.Entry<String, TreeSet<BigInteger>> emitted: emittedPerShard.entrySet()) {
@@ -543,24 +446,4 @@ class KinesisRecordsManager {
         return fetchRecords;
     }
 
-    private void initializeCurator () {
-        ZkInfo zkInfo = kinesisConfig.getZkInfo();
-        curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), zkInfo.getSessionTimeoutMs(), zkInfo.getConnectionTimeoutMs(), new
-                RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
-        curatorFramework.start();
-    }
-
-    private void initializeKinesisClient () {
-        kinesisClient = new AmazonKinesisClient(kinesisConfig.getKinesisConnectionInfo().getCredentialsProvider(), kinesisConfig.getKinesisConnectionInfo().getClientConfiguration());
-        kinesisClient.setRegion(Region.getRegion(kinesisConfig.getKinesisConnectionInfo().getRegion()));
-    }
-
-    private void shutdownCurator () {
-        curatorFramework.close();
-    }
-
-    private void shutdownKinesisClient () {
-        kinesisClient.shutdown();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b59c75c0/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
new file mode 100644
index 0000000..41151d1
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.zookeeper.CreateMode;
+import org.json.simple.JSONValue;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+class ZKConnection {
+
+    private final ZkInfo zkInfo;
+    private CuratorFramework curatorFramework;
+
+    ZKConnection (ZkInfo zkInfo) {
+        this.zkInfo = zkInfo;
+    }
+
+    void initialize () {
+        curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), zkInfo.getSessionTimeoutMs(), zkInfo.getConnectionTimeoutMs(), new
+                RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
+        curatorFramework.start();
+    }
+
+    void commitState (String stream, String shardId, Map<Object, Object> state) {
+        byte[] bytes = JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8"));
+        try {
+            String path = getZkPath(stream, shardId);
+            if (curatorFramework.checkExists().forPath(path) == null) {
+                curatorFramework.create()
+                        .creatingParentsIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, bytes);
+            } else {
+                curatorFramework.setData().forPath(path, bytes);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    Map<Object, Object> readState (String stream, String shardId) {
+        try {
+            String path = getZkPath(stream, shardId);
+            Map<Object, Object> state = null;
+            byte[] b = null;
+            if (curatorFramework.checkExists().forPath(path) != null) {
+                b = curatorFramework.getData().forPath(path);
+            }
+            if (b != null) {
+                state = (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
+            }
+            return state;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    void shutdown () {
+        curatorFramework.close();
+    }
+
+    private String getZkPath (String stream, String shardId) {
+        String path = "";
+        if (!zkInfo.getZkNode().startsWith("/")) {
+            path += "/";
+        }
+        path += zkInfo.getZkNode();
+        if (!zkInfo.getZkNode().endsWith("/")) {
+            path += "/";
+        }
+        path += (stream + "/" + shardId);
+        return path;
+    }
+}


[3/6] storm git commit: STORM-1839: Address review comments

Posted by sr...@apache.org.
STORM-1839: Address review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/49c47474
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/49c47474
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/49c47474

Branch: refs/heads/master
Commit: 49c47474dbbf5317a8aee68cd877b9bdef4ec27e
Parents: de68c26
Author: Priyank <ps...@hortonworks.com>
Authored: Tue Jul 26 10:23:51 2016 -0700
Committer: Priyank <ps...@hortonworks.com>
Committed: Tue Jul 26 11:47:11 2016 -0700

----------------------------------------------------------------------
 external/storm-kinesis/README.md                |  19 ++-
 .../org/apache/storm/kinesis/spout/Config.java  | 166 -------------------
 .../spout/ExponentialBackoffRetrier.java        |   4 +-
 .../storm/kinesis/spout/KinesisConfig.java      | 133 +++++++++++++++
 .../kinesis/spout/KinesisConnectionInfo.java    |  26 ---
 .../kinesis/spout/KinesisRecordsManager.java    |  88 +++++-----
 .../storm/kinesis/spout/KinesisSpout.java       |  12 +-
 .../org/apache/storm/kinesis/spout/ZkInfo.java  |  35 ----
 .../kinesis/spout/test/KinesisBoltTest.java     |   3 +-
 .../spout/test/KinesisSpoutTopology.java        |   8 +-
 .../spout/test/TestRecordToTupleMapper.java     |   7 +-
 11 files changed, 203 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/49c47474/external/storm-kinesis/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/README.md b/external/storm-kinesis/README.md
index 8ff2816..f163a54 100644
--- a/external/storm-kinesis/README.md
+++ b/external/storm-kinesis/README.md
@@ -11,9 +11,10 @@ public class KinesisSpoutTopology {
         RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
         KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2,
                 1000);
-        org.apache.storm.kinesis.spout.Config config = new org.apache.storm.kinesis.spout.Config(args[1], ShardIteratorType.TRIM_HORIZON,
-                recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), new ZkInfo(), kinesisConnectionInfo, 10000L);
-        KinesisSpout kinesisSpout = new KinesisSpout(config);
+        ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000);
+        KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON,
+                recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L);
+        KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig);
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         topologyBuilder.setSpout("spout", kinesisSpout, 3);
         topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");
@@ -24,7 +25,7 @@ public class KinesisSpoutTopology {
     }
 }
 ```
-As you can see above the spout takes an object of Config in its constructor. The constructor of Config takes 8 objects as explained below.
+As you can see above the spout takes an object of KinesisConfig in its constructor. The constructor of KinesisConfig takes 8 objects as explained below.
 
 #### `String` streamName
 name of kinesis stream to consume data from
@@ -33,6 +34,7 @@ name of kinesis stream to consume data from
 3 types are supported - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. By default this argument is ignored if state for shards 
 is found in zookeeper. Hence they will apply the first time a topology is started. If you want to use any of these in subsequent runs of the topology, you 
 will need to clear the state of zookeeper node used for storing sequence numbers
+
 #### `RecordToTupleMapper` recordToTupleMapper
 an implementation of `RecordToTupleMapper` interface that spout will call to convert a kinesis record to a storm tuple. It has two methods. getOutputFields 
 tells the spout the fields that will be present in the tuple emitted from the getTuple method. If getTuple returns null, the record will be acked
@@ -69,11 +71,10 @@ message when getNextFailedMessageToRetry is called again
 acked will be called once the failed message was re-emitted and successfully acked by the spout. If it was failed by the spout failed will be called again
 
 #### `ZkInfo` zkInfo
-an object encapsulating information for zookeeper interaction. It has two constructors. Default no args constructor takes zkUrl as first argument which 
-is a comma separated string of zk host and port, zkNode as second that will be used as the root node for storing committed sequence numbers, session timeout
-as third in milliseconds, connection timeout as fourth in milliseconds, commit interval as fifth in milliseconds for committing sequence numbers to zookeeper, 
-retry attempts as sixth for zk client connection retry attempts, retry interval as seventh in milliseconds for time to wait before retrying to connect. Default 
-constructor uses the values ["localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000]
+an object encapsulating information for zookeeper interaction. The constructor takes zkUrl as first argument which is a comma separated string of zk host and
+port, zkNode as second that will be used as the root node for storing committed sequence numbers, session timeout as third in milliseconds, connection timeout
+as fourth in milliseconds, commit interval as fifth in milliseconds for committing sequence numbers to zookeeper, retry attempts as sixth for zk client
+connection retry attempts, retry interval as seventh in milliseconds for time to wait before retrying to connect. 
 
 #### `KinesisConnectionInfo` kinesisConnectionInfo
 an object that captures arguments for connecting to kinesis using kinesis client. It has a constructor that takes an implementation of `AWSCredentialsProvider`

http://git-wip-us.apache.org/repos/asf/storm/blob/49c47474/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java
deleted file mode 100644
index 5be8de9..0000000
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kinesis.spout;
-
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.Date;
-
-public class Config implements Serializable {
-    // kinesis stream name to read from
-    private final String streamName;
-    // shard iterator type based on kinesis api - beginning of time, latest, at timestamp are only supported
-    private final ShardIteratorType shardIteratorType;
-    // implementation for converting a Kinesis record to a storm tuple
-    private final RecordToTupleMapper recordToTupleMapper;
-    // timestamp to be used for shardIteratorType AT_TIMESTAMP - can be null
-    private final Date timestamp;
-    // implementation for handling the failed messages retry logic
-    private final FailedMessageRetryHandler failedMessageRetryHandler;
-    // object capturing all zk related information for storing committed sequence numbers
-    private final ZkInfo zkInfo;
-    // object representing information on paramaters to use while connecting to kinesis using kinesis client
-    private final KinesisConnectionInfo kinesisConnectionInfo;
-    // this number represents the number of messages that are still not committed to zk. it will prevent the spout from emitting further.
-    // for e.g. if 1 failed and 2,3,4,5..... all have been acked by storm, they still cant be committed to zk because 1 is still in failed set. As a result
-    // the acked queue can infinitely grow without any of them being committed to zk. topology max pending does not help since from storm's view they are acked
-    private final Long maxUncommittedRecords;
-
-    public Config (String streamName, ShardIteratorType shardIteratorType, RecordToTupleMapper recordToTupleMapper, Date timestamp, FailedMessageRetryHandler
-            failedMessageRetryHandler, ZkInfo zkInfo, KinesisConnectionInfo kinesisConnectionInfo, Long maxUncommittedRecords) {
-        this.streamName = streamName;
-        this.shardIteratorType = shardIteratorType;
-        this.recordToTupleMapper = recordToTupleMapper;
-        this.timestamp = timestamp;
-        this.failedMessageRetryHandler = failedMessageRetryHandler;
-        this.zkInfo = zkInfo;
-        this.kinesisConnectionInfo = kinesisConnectionInfo;
-        this.maxUncommittedRecords = maxUncommittedRecords;
-        validate();
-    }
-
-    private void validate () {
-        if (streamName == null || streamName.length() < 1) {
-            throw new IllegalArgumentException("streamName is required and cannot be of length 0.");
-        }
-        if (shardIteratorType == null || shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType
-                .AT_SEQUENCE_NUMBER)) {
-            throw new IllegalArgumentException("shardIteratorType has to be one of the " + ShardIteratorType.AT_TIMESTAMP + "," + ShardIteratorType.LATEST +
-                    "," + ShardIteratorType.TRIM_HORIZON);
-        }
-        if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP) && timestamp == null) {
-            throw new IllegalArgumentException("timestamp must be provided if shardIteratorType is " + ShardIteratorType.AT_TIMESTAMP);
-        }
-        if (recordToTupleMapper == null) {
-            throw new IllegalArgumentException("recordToTupleMapper cannot be null");
-        }
-        if (failedMessageRetryHandler == null) {
-            throw new IllegalArgumentException("failedMessageRetryHandler cannot be null");
-        }
-        if (zkInfo == null) {
-            throw new IllegalArgumentException("zkInfo cannot be null");
-        }
-        if (kinesisConnectionInfo == null) {
-            throw new IllegalArgumentException("kinesisConnectionInfo cannot be null");
-        }
-        if (maxUncommittedRecords == null || maxUncommittedRecords < 1) {
-            throw new IllegalArgumentException("maxUncommittedRecords has to be a positive integer");
-        }
-    }
-
-    public String getStreamName() {
-        return streamName;
-    }
-
-    public ShardIteratorType getShardIteratorType() {
-        return shardIteratorType;
-    }
-
-    public RecordToTupleMapper getRecordToTupleMapper() {
-        return recordToTupleMapper;
-    }
-
-    public Date getTimestamp() {
-        return timestamp;
-    }
-
-    public FailedMessageRetryHandler getFailedMessageRetryHandler () {
-        return failedMessageRetryHandler;
-    }
-
-    public ZkInfo getZkInfo () {
-        return zkInfo;
-    }
-
-    public KinesisConnectionInfo getKinesisConnectionInfo () {
-        return kinesisConnectionInfo;
-    }
-
-    public Long getMaxUncommittedRecords () {
-        return maxUncommittedRecords;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        Config config = (Config) o;
-
-        if (streamName != null ? !streamName.equals(config.streamName) : config.streamName != null) return false;
-        if (shardIteratorType != config.shardIteratorType) return false;
-        if (recordToTupleMapper != null ? !recordToTupleMapper.equals(config.recordToTupleMapper) : config.recordToTupleMapper != null) return false;
-        if (timestamp != null ? !timestamp.equals(config.timestamp) : config.timestamp != null) return false;
-        if (zkInfo != null ? !zkInfo.equals(config.zkInfo) : config.zkInfo != null) return false;
-        if (kinesisConnectionInfo != null ? !kinesisConnectionInfo.equals(config.kinesisConnectionInfo) : config.kinesisConnectionInfo != null) return false;
-        if (maxUncommittedRecords != null ? !maxUncommittedRecords.equals(config.maxUncommittedRecords) : config.maxUncommittedRecords != null) return false;
-        return !(failedMessageRetryHandler != null ? !failedMessageRetryHandler.equals(config.failedMessageRetryHandler) : config.failedMessageRetryHandler
-                != null);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = streamName != null ? streamName.hashCode() : 0;
-        result = 31 * result + (shardIteratorType != null ? shardIteratorType.hashCode() : 0);
-        result = 31 * result + (recordToTupleMapper != null ? recordToTupleMapper.hashCode() : 0);
-        result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0);
-        result = 31 * result + (zkInfo != null ? zkInfo.hashCode() : 0);
-        result = 31 * result + (kinesisConnectionInfo != null ? kinesisConnectionInfo.hashCode() : 0);
-        result = 31 * result + (failedMessageRetryHandler != null ? failedMessageRetryHandler.hashCode() : 0);
-        result = 31 * result + (maxUncommittedRecords != null ? maxUncommittedRecords.hashCode() : 0);
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "Config{" +
-                "streamName='" + streamName + '\'' +
-                ", shardIteratorType=" + shardIteratorType +
-                ", recordToTupleMapper=" + recordToTupleMapper +
-                ", timestamp=" + timestamp +
-                ", zkInfo=" + zkInfo +
-                ", kinesisConnectionInfo=" + kinesisConnectionInfo +
-                ", failedMessageRetryHandler =" + failedMessageRetryHandler +
-                ", maxUncommittedRecords=" + maxUncommittedRecords +
-                '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/49c47474/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
index f357f30..2a702f8 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
@@ -80,7 +80,7 @@ public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Ser
         LOG.debug("Handling failed message " + messageId);
         // if maxRetries is 0, dont retry and return false as per interface contract
         if (maxRetries == 0) {
-            LOG.debug("maxRetries set to 0. Hence not queueing " + messageId);
+            LOG.warn("maxRetries set to 0. Hence not queueing " + messageId);
             return false;
         }
         // if first failure add it to the count map
@@ -92,7 +92,7 @@ public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Ser
         failCounts.put(messageId, ++failCount);
         // if fail count is greater than maxRetries, discard or ack. for e.g. for maxRetries 3, 4 failures are allowed at maximum
         if (failCount > maxRetries) {
-            LOG.debug("maxRetries reached so dropping " + messageId);
+            LOG.warn("maxRetries reached so dropping " + messageId);
             failCounts.remove(messageId);
             return false;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/49c47474/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
new file mode 100644
index 0000000..744aef5
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
+import java.io.Serializable;
+import java.util.Date;
+
+public class KinesisConfig implements Serializable {
+    // kinesis stream name to read from
+    private final String streamName;
+    // shard iterator type based on kinesis api - beginning of time, latest, at timestamp are only supported
+    private final ShardIteratorType shardIteratorType;
+    // implementation for converting a Kinesis record to a storm tuple
+    private final RecordToTupleMapper recordToTupleMapper;
+    // timestamp to be used for shardIteratorType AT_TIMESTAMP - can be null
+    private final Date timestamp;
+    // implementation for handling the failed messages retry logic
+    private final FailedMessageRetryHandler failedMessageRetryHandler;
+    // object capturing all zk related information for storing committed sequence numbers
+    private final ZkInfo zkInfo;
+    // object representing information on paramaters to use while connecting to kinesis using kinesis client
+    private final KinesisConnectionInfo kinesisConnectionInfo;
+    // this number represents the number of messages that are still not committed to zk. it will prevent the spout from emitting further.
+    // for e.g. if 1 failed and 2,3,4,5..... all have been acked by storm, they still cant be committed to zk because 1 is still in failed set. As a result
+    // the acked queue can infinitely grow without any of them being committed to zk. topology max pending does not help since from storm's view they are acked
+    private final Long maxUncommittedRecords;
+
+    public KinesisConfig(String streamName, ShardIteratorType shardIteratorType, RecordToTupleMapper recordToTupleMapper, Date timestamp, FailedMessageRetryHandler
+            failedMessageRetryHandler, ZkInfo zkInfo, KinesisConnectionInfo kinesisConnectionInfo, Long maxUncommittedRecords) {
+        this.streamName = streamName;
+        this.shardIteratorType = shardIteratorType;
+        this.recordToTupleMapper = recordToTupleMapper;
+        this.timestamp = timestamp;
+        this.failedMessageRetryHandler = failedMessageRetryHandler;
+        this.zkInfo = zkInfo;
+        this.kinesisConnectionInfo = kinesisConnectionInfo;
+        this.maxUncommittedRecords = maxUncommittedRecords;
+        validate();
+    }
+
+    private void validate () {
+        if (streamName == null || streamName.length() < 1) {
+            throw new IllegalArgumentException("streamName is required and cannot be of length 0.");
+        }
+        if (shardIteratorType == null || shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType
+                .AT_SEQUENCE_NUMBER)) {
+            throw new IllegalArgumentException("shardIteratorType has to be one of the " + ShardIteratorType.AT_TIMESTAMP + "," + ShardIteratorType.LATEST +
+                    "," + ShardIteratorType.TRIM_HORIZON);
+        }
+        if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP) && timestamp == null) {
+            throw new IllegalArgumentException("timestamp must be provided if shardIteratorType is " + ShardIteratorType.AT_TIMESTAMP);
+        }
+        if (recordToTupleMapper == null) {
+            throw new IllegalArgumentException("recordToTupleMapper cannot be null");
+        }
+        if (failedMessageRetryHandler == null) {
+            throw new IllegalArgumentException("failedMessageRetryHandler cannot be null");
+        }
+        if (zkInfo == null) {
+            throw new IllegalArgumentException("zkInfo cannot be null");
+        }
+        if (kinesisConnectionInfo == null) {
+            throw new IllegalArgumentException("kinesisConnectionInfo cannot be null");
+        }
+        if (maxUncommittedRecords == null || maxUncommittedRecords < 1) {
+            throw new IllegalArgumentException("maxUncommittedRecords has to be a positive integer");
+        }
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    public ShardIteratorType getShardIteratorType() {
+        return shardIteratorType;
+    }
+
+    public RecordToTupleMapper getRecordToTupleMapper() {
+        return recordToTupleMapper;
+    }
+
+    public Date getTimestamp() {
+        return timestamp;
+    }
+
+    public FailedMessageRetryHandler getFailedMessageRetryHandler () {
+        return failedMessageRetryHandler;
+    }
+
+    public ZkInfo getZkInfo () {
+        return zkInfo;
+    }
+
+    public KinesisConnectionInfo getKinesisConnectionInfo () {
+        return kinesisConnectionInfo;
+    }
+
+    public Long getMaxUncommittedRecords () {
+        return maxUncommittedRecords;
+    }
+
+    @Override
+    public String toString() {
+        return "KinesisConfig{" +
+                "streamName='" + streamName + '\'' +
+                ", shardIteratorType=" + shardIteratorType +
+                ", recordToTupleMapper=" + recordToTupleMapper +
+                ", timestamp=" + timestamp +
+                ", zkInfo=" + zkInfo +
+                ", kinesisConnectionInfo=" + kinesisConnectionInfo +
+                ", failedMessageRetryHandler =" + failedMessageRetryHandler +
+                ", maxUncommittedRecords=" + maxUncommittedRecords +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/49c47474/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
index 5d9454a..67ca29f 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
@@ -58,9 +58,6 @@ public class KinesisConnectionInfo implements Serializable {
         serializedkinesisClientConfig = getKryoSerializedBytes(clientConfiguration);
         this.recordsLimit = recordsLimit;
         this.region = region;
-
-        this.credentialsProvider = null;
-        this.clientConfiguration = null;
     }
 
     public Integer getRecordsLimit() {
@@ -103,29 +100,6 @@ public class KinesisConnectionInfo implements Serializable {
     }
 
     @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        KinesisConnectionInfo that = (KinesisConnectionInfo) o;
-
-        if (!Arrays.equals(serializedKinesisCredsProvider, that.serializedKinesisCredsProvider)) return false;
-        if (!Arrays.equals(serializedkinesisClientConfig, that.serializedkinesisClientConfig)) return false;
-        if (region != null ? !region.equals(that.region) : that.region != null) return false;
-        return !(recordsLimit != null ? !recordsLimit.equals(that.recordsLimit) : that.recordsLimit != null);
-
-    }
-
-    @Override
-    public int hashCode() {
-        int result = serializedKinesisCredsProvider != null ? Arrays.hashCode(serializedKinesisCredsProvider) : 0;
-        result = 31 * result + (serializedkinesisClientConfig != null ? Arrays.hashCode(serializedkinesisClientConfig) : 0);
-        result = 31 * result + (region != null ? region.hashCode() : 0);
-        result = 31 * result + (recordsLimit != null ? recordsLimit.hashCode() : 0);
-        return result;
-    }
-
-    @Override
     public String toString() {
         return "KinesisConnectionInfo{" +
                 "serializedKinesisCredsProvider=" + Arrays.toString(serializedKinesisCredsProvider) +

http://git-wip-us.apache.org/repos/asf/storm/blob/49c47474/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
index df2f97d..bdd054f 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
@@ -55,8 +55,8 @@ class KinesisRecordsManager {
     private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
     // zk interaction object
     private transient CuratorFramework curatorFramework;
-    // Kinesis Spout Config object
-    private transient final Config config;
+    // Kinesis Spout KinesisConfig object
+    private transient final KinesisConfig kinesisConfig;
     // Queue of records per shard fetched from kinesis and are waiting to be emitted
     private transient Map<String, LinkedList<Record>> toEmitPerShard = new HashMap<>();
     // Map of records  that were fetched from kinesis as a result of failure and are waiting to be emitted
@@ -80,8 +80,8 @@ class KinesisRecordsManager {
     private transient boolean deactivated;
     private transient AmazonKinesisClient kinesisClient;
 
-    KinesisRecordsManager (Config config) {
-        this.config = config;
+    KinesisRecordsManager (KinesisConfig kinesisConfig) {
+        this.kinesisConfig = kinesisConfig;
     }
 
     void initialize (int myTaskIndex, int totalTasks) {
@@ -106,7 +106,7 @@ class KinesisRecordsManager {
         if (shouldCommit()) {
             commit();
         }
-        KinesisMessageId failedMessageId = config.getFailedMessageRetryHandler().getNextFailedMessageToRetry();
+        KinesisMessageId failedMessageId = kinesisConfig.getFailedMessageRetryHandler().getNextFailedMessageToRetry();
         if (failedMessageId  != null) {
             // if the retry service returns a message that is not in failed set then ignore it. should never happen
             BigInteger failedSequenceNumber = new BigInteger(failedMessageId.getSequenceNumber());
@@ -116,25 +116,25 @@ class KinesisRecordsManager {
                 }
                 if (emitFailedRecord(collector, failedMessageId)) {
                     failedPerShard.get(failedMessageId.getShardId()).remove(failedSequenceNumber);
-                    config.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId);
+                    kinesisConfig.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId);
                     return;
                 } else {
-                    LOG.debug("failedMessageEmitted not called on retrier for " + failedMessageId + ". This can happen a few times but should not happen " +
+                    LOG.warn("failedMessageEmitted not called on retrier for " + failedMessageId + ". This can happen a few times but should not happen " +
                             "infinitely");
                 }
             } else {
-                LOG.debug("failedPerShard does not contain " + failedMessageId + ". This should never happen.");
+                LOG.warn("failedPerShard does not contain " + failedMessageId + ". This should never happen.");
             }
         }
         LOG.debug("No failed record to emit for now. Hence will try to emit new records");
         // if maximum uncommitted records count has reached, so dont emit any new records and return
-        if (!(getUncommittedRecordsCount() < config.getMaxUncommittedRecords())) {
-            LOG.debug("maximum uncommitted records count has reached. so not emitting any new records and returning");
+        if (!(getUncommittedRecordsCount() < kinesisConfig.getMaxUncommittedRecords())) {
+            LOG.warn("maximum uncommitted records count has reached. so not emitting any new records and returning");
             return;
         }
         // early return as no shard is assigned - probably because number of executors > number of shards
         if (toEmitPerShard.isEmpty()) {
-            LOG.debug("No shard is assigned to this task. Hence not emitting any tuple.");
+            LOG.warn("No shard is assigned to this task. Hence not emitting any tuple.");
             return;
         }
 
@@ -161,7 +161,7 @@ class KinesisRecordsManager {
             failedPerShard.get(shardId).remove(sequenceNumber);
         }
         if (failedandFetchedRecords.containsKey(kinesisMessageId)) {
-            config.getFailedMessageRetryHandler().acked(kinesisMessageId);
+            kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId);
             failedandFetchedRecords.remove(kinesisMessageId);
         }
         // keep committing when topology is deactivated since ack and fail keep getting called on deactivated topology
@@ -175,7 +175,7 @@ class KinesisRecordsManager {
         BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
         LOG.debug("Fail received for shardId: " + shardId + " sequenceNumber: " + sequenceNumber);
         // for a failed message add it to failed set if it will be retried, otherwise ack it; remove from emitted either way
-        if (config.getFailedMessageRetryHandler().failed(kinesisMessageId)) {
+        if (kinesisConfig.getFailedMessageRetryHandler().failed(kinesisMessageId)) {
             if (!failedPerShard.containsKey(shardId)) {
                 failedPerShard.put(shardId, new TreeSet<BigInteger>());
             }
@@ -250,14 +250,14 @@ class KinesisRecordsManager {
 
     private String getZkPath (String shardId) {
         String path = "";
-        if (!config.getZkInfo().getZkNode().startsWith("/")) {
+        if (!kinesisConfig.getZkInfo().getZkNode().startsWith("/")) {
             path += "/";
         }
-        path += config.getZkInfo().getZkNode();
-        if (!config.getZkInfo().getZkNode().endsWith("/")) {
+        path += kinesisConfig.getZkInfo().getZkNode();
+        if (!kinesisConfig.getZkInfo().getZkNode().endsWith("/")) {
             path += "/";
         }
-        path += (config.getStreamName() + "/" + shardId);
+        path += (kinesisConfig.getStreamName() + "/" + shardId);
         return path;
     }
 
@@ -311,7 +311,7 @@ class KinesisRecordsManager {
                 // update the shard iterator to next one in case this fetch does not give the message.
                 shardIteratorPerFailedMessage.put(kinesisMessageId, getRecordsResult.getNextShardIterator());
                 if (records.size() == 0) {
-                    LOG.debug("No records returned from kinesis. Hence sleeping for 1 second");
+                    LOG.warn("No records returned from kinesis. Hence sleeping for 1 second");
                     Thread.sleep(1000);
                 } else {
                     // add all fetched records to the set of failed records if they are present in failed set
@@ -325,16 +325,16 @@ class KinesisRecordsManager {
                 }
             }
         } catch (InterruptedException ie) {
-            LOG.debug("Thread interrupted while sleeping", ie);
+            LOG.warn("Thread interrupted while sleeping", ie);
         } catch (ExpiredIteratorException ex) {
-            LOG.debug("shardIterator for failedRecord " + kinesisMessageId + " has expired. Refreshing shardIterator");
+            LOG.warn("shardIterator for failedRecord " + kinesisMessageId + " has expired. Refreshing shardIterator");
             refreshShardIteratorForFailedRecord(kinesisMessageId);
         } catch (ProvisionedThroughputExceededException pe) {
             try {
-                LOG.debug("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", pe);
+                LOG.warn("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", pe);
                 Thread.sleep(1000);
             } catch (InterruptedException e) {
-                LOG.debug("Thread interrupted exception", e);
+                LOG.warn("Thread interrupted exception", e);
             }
         }
     }
@@ -353,7 +353,7 @@ class KinesisRecordsManager {
                     // update the shard iterator to next one in case this fetch does not give the message.
                     shardIteratorPerShard.put(shardId, getRecordsResult.getNextShardIterator());
                     if (records.size() == 0) {
-                        LOG.debug("No records returned from kinesis. Hence sleeping for 1 second");
+                        LOG.warn("No records returned from kinesis. Hence sleeping for 1 second");
                         Thread.sleep(1000);
                     } else {
                         entry.getValue().addAll(records);
@@ -361,16 +361,16 @@ class KinesisRecordsManager {
                     }
                 }
             } catch (InterruptedException ie) {
-                LOG.debug("Thread interrupted while sleeping", ie);
+                LOG.warn("Thread interrupted while sleeping", ie);
             } catch (ExpiredIteratorException ex) {
-                LOG.debug("shardIterator for shardId " + shardId + " has expired. Refreshing shardIterator");
+                LOG.warn("shardIterator for shardId " + shardId + " has expired. Refreshing shardIterator");
                 refreshShardIteratorForNewRecords(shardId);
             } catch (ProvisionedThroughputExceededException pe) {
                 try {
-                    LOG.debug("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", pe);
+                    LOG.warn("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", pe);
                     Thread.sleep(1000);
                 } catch (InterruptedException e) {
-                    LOG.debug("Thread interrupted exception", e);
+                    LOG.warn("Thread interrupted exception", e);
                 }
             }
         }
@@ -380,14 +380,14 @@ class KinesisRecordsManager {
         List<Record> records = new ArrayList<>();
         GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
         getRecordsRequest.setShardIterator(shardIterator);
-        getRecordsRequest.setLimit(config.getKinesisConnectionInfo().getRecordsLimit());
+        getRecordsRequest.setLimit(kinesisConfig.getKinesisConnectionInfo().getRecordsLimit());
         GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
         return getRecordsResult;
     }
 
     private List<Shard> getShards () {
         DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
-        describeStreamRequest.setStreamName(config.getStreamName());
+        describeStreamRequest.setStreamName(kinesisConfig.getStreamName());
         List<Shard> shards = new ArrayList<>();
         String exclusiveStartShardId = null;
         do {
@@ -400,7 +400,7 @@ class KinesisRecordsManager {
                 exclusiveStartShardId = null;
             }
         } while ( exclusiveStartShardId != null );
-        LOG.info("Number of shards for stream " + config.getStreamName() + " are " + shards.size());
+        LOG.info("Number of shards for stream " + kinesisConfig.getStreamName() + " are " + shards.size());
         return shards;
     }
 
@@ -410,7 +410,7 @@ class KinesisRecordsManager {
             LinkedList<Record> listOfRecords = entry.getValue();
             Record record;
             while ((record = listOfRecords.pollFirst()) != null) {
-                KinesisMessageId kinesisMessageId = new KinesisMessageId(config.getStreamName(), shardId, record.getSequenceNumber());
+                KinesisMessageId kinesisMessageId = new KinesisMessageId(kinesisConfig.getStreamName(), shardId, record.getSequenceNumber());
                 if (emitRecord(collector, record, kinesisMessageId)) {
                    return;
                 }
@@ -427,7 +427,7 @@ class KinesisRecordsManager {
 
     private boolean emitRecord (SpoutOutputCollector collector, Record record, KinesisMessageId kinesisMessageId) {
         boolean result = false;
-        List<Object> tuple = config.getRecordToTupleMapper().getTuple(record);
+        List<Object> tuple = kinesisConfig.getRecordToTupleMapper().getTuple(record);
         // if a record is returned put the sequence number in the emittedPerShard to tie back with ack or fail
         if (tuple != null && tuple.size() > 0) {
             collector.emit(tuple, kinesisMessageId);
@@ -438,14 +438,14 @@ class KinesisRecordsManager {
             result = true;
         } else {
             // ack to not process the record again on restart and move on to next message
-            LOG.debug("Record " + record + " did not return a tuple to emit. Hence acking it");
+            LOG.warn("Record " + record + " did not return a tuple to emit. Hence acking it");
             ack(kinesisMessageId);
         }
         return result;
     }
 
     private boolean shouldCommit () {
-        return (System.currentTimeMillis() - lastCommitTime >= config.getZkInfo().getCommitIntervalMs());
+        return (System.currentTimeMillis() - lastCommitTime >= kinesisConfig.getZkInfo().getCommitIntervalMs());
     }
 
     private void initializeFetchedSequenceNumbers () {
@@ -471,12 +471,12 @@ class KinesisRecordsManager {
     private void refreshShardIteratorForNewRecords (String shardId) {
         String shardIterator = null;
         String lastFetchedSequenceNumber = fetchedSequenceNumberPerShard.get(shardId);
-        ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null ? config.getShardIteratorType() : ShardIteratorType
+        ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null ? kinesisConfig.getShardIteratorType() : ShardIteratorType
                 .AFTER_SEQUENCE_NUMBER);
         // Set the shard iterator for last fetched sequence number to start from correct position in shard
-        shardIterator = this.getShardIterator(shardId, shardIteratorType, lastFetchedSequenceNumber, config.getTimestamp());
+        shardIterator = this.getShardIterator(shardId, shardIteratorType, lastFetchedSequenceNumber, kinesisConfig.getTimestamp());
         if (shardIterator != null && !shardIterator.isEmpty()) {
-            LOG.debug("Refreshing shard iterator for new records for shardId " + shardId + " with shardIterator " + shardIterator);
+            LOG.warn("Refreshing shard iterator for new records for shardId " + shardId + " with shardIterator " + shardIterator);
             shardIteratorPerShard.put(shardId, shardIterator);
         }
     }
@@ -486,7 +486,7 @@ class KinesisRecordsManager {
         // Set the shard iterator for last fetched sequence number to start from correct position in shard
         shardIterator = this.getShardIterator(kinesisMessageId.getShardId(), ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
         if (shardIterator != null && !shardIterator.isEmpty()) {
-            LOG.debug("Refreshing shard iterator for failed records for message " + kinesisMessageId + " with shardIterator " + shardIterator);
+            LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId + " with shardIterator " + shardIterator);
             shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator);
         }
     }
@@ -495,7 +495,7 @@ class KinesisRecordsManager {
         String shardIterator = "";
         try {
             GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
-            getShardIteratorRequest.setStreamName(config.getStreamName());
+            getShardIteratorRequest.setStreamName(kinesisConfig.getStreamName());
             getShardIteratorRequest.setShardId(shardId);
             getShardIteratorRequest.setShardIteratorType(shardIteratorType);
             if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
@@ -508,10 +508,10 @@ class KinesisRecordsManager {
                 shardIterator = getShardIteratorResult.getShardIterator();
             }
         } catch (Exception e) {
-            LOG.debug("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " +
+            LOG.warn("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " +
                     sequenceNumber + " timestamp " + timestamp, e);
         }
-        LOG.debug("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " +
+        LOG.warn("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " +
                 sequenceNumber + " timestamp" + timestamp);
         return shardIterator;
     }
@@ -544,15 +544,15 @@ class KinesisRecordsManager {
     }
 
     private void initializeCurator () {
-        ZkInfo zkInfo = config.getZkInfo();
+        ZkInfo zkInfo = kinesisConfig.getZkInfo();
         curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), zkInfo.getSessionTimeoutMs(), zkInfo.getConnectionTimeoutMs(), new
                 RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
         curatorFramework.start();
     }
 
     private void initializeKinesisClient () {
-        kinesisClient = new AmazonKinesisClient(config.getKinesisConnectionInfo().getCredentialsProvider(), config.getKinesisConnectionInfo().getClientConfiguration());
-        kinesisClient.setRegion(Region.getRegion(config.getKinesisConnectionInfo().getRegion()));
+        kinesisClient = new AmazonKinesisClient(kinesisConfig.getKinesisConnectionInfo().getCredentialsProvider(), kinesisConfig.getKinesisConnectionInfo().getClientConfiguration());
+        kinesisClient.setRegion(Region.getRegion(kinesisConfig.getKinesisConnectionInfo().getRegion()));
     }
 
     private void shutdownCurator () {

http://git-wip-us.apache.org/repos/asf/storm/blob/49c47474/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
index 1ead4c0..500195b 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
@@ -22,24 +22,22 @@ import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 public class KinesisSpout extends BaseRichSpout {
 
-    private final Config config;
+    private final KinesisConfig kinesisConfig;
     private transient KinesisRecordsManager kinesisRecordsManager;
     private transient SpoutOutputCollector collector;
 
-    public KinesisSpout (Config config) {
-        this.config = config;
+    public KinesisSpout (KinesisConfig kinesisConfig) {
+        this.kinesisConfig = kinesisConfig;
     }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(config.getRecordToTupleMapper().getOutputFields());
+        declarer.declare(kinesisConfig.getRecordToTupleMapper().getOutputFields());
     }
 
     @Override
@@ -50,7 +48,7 @@ public class KinesisSpout extends BaseRichSpout {
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         this.collector = collector;
-        kinesisRecordsManager = new KinesisRecordsManager(config);
+        kinesisRecordsManager = new KinesisRecordsManager(kinesisConfig);
         kinesisRecordsManager.initialize(context.getThisTaskIndex(), context.getComponentTasks(context.getThisComponentId()).size());
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/49c47474/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
index 17bcd6f..a47f0ab 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
@@ -36,13 +36,6 @@ public class ZkInfo implements Serializable {
     // time to sleep between retries in milliseconds
     private final Integer retryIntervalMs;
 
-    /**
-     * Default constructor that uses defaults for a local setup
-     */
-    public ZkInfo () {
-        this("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000);
-    }
-
     public ZkInfo (String zkUrl, String zkNode, Integer sessionTimeoutMs, Integer connectionTimeoutMs, Long commitIntervalMs, Integer retryAttempts, Integer
             retryIntervalMs) {
         this.zkUrl = zkUrl;
@@ -122,32 +115,4 @@ public class ZkInfo implements Serializable {
                 '}';
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        ZkInfo zkInfo = (ZkInfo) o;
-
-        if (zkUrl != null ? !zkUrl.equals(zkInfo.zkUrl) : zkInfo.zkUrl != null) return false;
-        if (zkNode != null ? !zkNode.equals(zkInfo.zkNode) : zkInfo.zkNode != null) return false;
-        if (sessionTimeoutMs != null ? !sessionTimeoutMs.equals(zkInfo.sessionTimeoutMs) : zkInfo.sessionTimeoutMs != null) return false;
-        if (connectionTimeoutMs != null ? !connectionTimeoutMs.equals(zkInfo.connectionTimeoutMs) : zkInfo.connectionTimeoutMs != null) return false;
-        if (commitIntervalMs != null ? !commitIntervalMs.equals(zkInfo.commitIntervalMs) : zkInfo.commitIntervalMs != null) return false;
-        if (retryAttempts != null ? !retryAttempts.equals(zkInfo.retryAttempts) : zkInfo.retryAttempts != null) return false;
-        return !(retryIntervalMs != null ? !retryIntervalMs.equals(zkInfo.retryIntervalMs) : zkInfo.retryIntervalMs != null);
-
-    }
-
-    @Override
-    public int hashCode() {
-        int result = zkUrl != null ? zkUrl.hashCode() : 0;
-        result = 31 * result + (zkNode != null ? zkNode.hashCode() : 0);
-        result = 31 * result + (sessionTimeoutMs != null ? sessionTimeoutMs.hashCode() : 0);
-        result = 31 * result + (connectionTimeoutMs != null ? connectionTimeoutMs.hashCode() : 0);
-        result = 31 * result + (commitIntervalMs != null ? commitIntervalMs.hashCode() : 0);
-        result = 31 * result + (retryAttempts != null ? retryAttempts.hashCode() : 0);
-        result = 31 * result + (retryIntervalMs != null ? retryIntervalMs.hashCode() : 0);
-        return result;
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/49c47474/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
index eaa0246..1894934 100644
--- a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
+++ b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
@@ -21,8 +21,7 @@ public class KinesisBoltTest extends BaseRichBolt {
 
     @Override
     public void execute(Tuple input) {
-        LOG.debug("input = [" + input + "]");
-        System.out.println("input = [" + input + "]");
+        LOG.info("input = [" + input + "]");
         collector.ack(input);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/49c47474/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
index d8fca64..2a39463 100644
--- a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
+++ b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
@@ -10,6 +10,7 @@ import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.InvalidTopologyException;
 import org.apache.storm.kinesis.spout.CredentialsProviderChain;
 import org.apache.storm.kinesis.spout.ExponentialBackoffRetrier;
+import org.apache.storm.kinesis.spout.KinesisConfig;
 import org.apache.storm.kinesis.spout.KinesisConnectionInfo;
 import org.apache.storm.kinesis.spout.KinesisSpout;
 import org.apache.storm.kinesis.spout.RecordToTupleMapper;
@@ -24,9 +25,10 @@ public class KinesisSpoutTopology {
         RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
         KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2,
                 1000);
-        org.apache.storm.kinesis.spout.Config config = new org.apache.storm.kinesis.spout.Config(args[1], ShardIteratorType.TRIM_HORIZON,
-                recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), new ZkInfo(), kinesisConnectionInfo, 10000L);
-        KinesisSpout kinesisSpout = new KinesisSpout(config);
+        ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000);
+        KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON,
+                recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L);
+        KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig);
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         topologyBuilder.setSpout("spout", kinesisSpout, 3);
         topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");

http://git-wip-us.apache.org/repos/asf/storm/blob/49c47474/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
index f4662b9..03e024a 100644
--- a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
+++ b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
@@ -27,16 +27,15 @@ public class TestRecordToTupleMapper implements RecordToTupleMapper, Serializabl
         tuple.add(record.getPartitionKey());
         tuple.add(record.getSequenceNumber());
         try {
-            System.out.println("bytebuffer is " + record.getData());
             String data = decoder.decode(record.getData()).toString();
-            System.out.println("data is " + data);
+            LOG.info("data is " + data);
             tuple.add(data);
         } catch (CharacterCodingException e) {
             e.printStackTrace();
-            System.out.println("Exception occured. Emitting tuple with empty string data");
+            LOG.warn("Exception occured. Emitting tuple with empty string data", e);
             tuple.add("");
         }
-        System.out.println("Tuple from record is " + tuple);
+        LOG.info("Tuple from record is " + tuple);
         return tuple;
     }
 }


[2/6] storm git commit: STORM-1839: Storm spout implementation for Amazon Kinesis Streams.

Posted by sr...@apache.org.
STORM-1839: Storm spout implementation for Amazon Kinesis Streams.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/de68c267
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/de68c267
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/de68c267

Branch: refs/heads/master
Commit: de68c267fcb7555c7729c9377d3f6d1e504ec25e
Parents: 8115ef5
Author: Priyank <ps...@hortonworks.com>
Authored: Tue Jul 12 12:17:54 2016 -0700
Committer: Priyank <ps...@hortonworks.com>
Committed: Sat Jul 23 00:09:45 2016 -0700

----------------------------------------------------------------------
 external/storm-kinesis/README.md                | 139 +++++
 external/storm-kinesis/pom.xml                  |  69 +++
 .../org/apache/storm/kinesis/spout/Config.java  | 166 ++++++
 .../kinesis/spout/CredentialsProviderChain.java |  39 ++
 .../spout/ExponentialBackoffRetrier.java        | 164 ++++++
 .../spout/FailedMessageRetryHandler.java        |  48 ++
 .../kinesis/spout/KinesisConnectionInfo.java    | 137 +++++
 .../storm/kinesis/spout/KinesisMessageId.java   |  73 +++
 .../kinesis/spout/KinesisRecordsManager.java    | 566 +++++++++++++++++++
 .../storm/kinesis/spout/KinesisSpout.java       |  88 +++
 .../kinesis/spout/RecordToTupleMapper.java      |  38 ++
 .../org/apache/storm/kinesis/spout/ZkInfo.java  | 153 +++++
 .../kinesis/spout/test/KinesisBoltTest.java     |  32 ++
 .../spout/test/KinesisSpoutTopology.java        |  38 ++
 .../spout/test/TestRecordToTupleMapper.java     |  42 ++
 pom.xml                                         |   1 +
 storm-dist/binary/src/main/assembly/binary.xml  |  14 +
 17 files changed, 1807 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/README.md b/external/storm-kinesis/README.md
new file mode 100644
index 0000000..8ff2816
--- /dev/null
+++ b/external/storm-kinesis/README.md
@@ -0,0 +1,139 @@
+#Storm Kinesis Spout
+Provides core storm spout for consuming data from a stream in Amazon Kinesis Streams. It stores the sequence numbers that can be committed in zookeeper and 
+starts consuming records after that sequence number on restart by default. Below is the code sample to create a sample topology that uses the spout. Each 
+object used in configuring the spout is explained below. Ideally, the number of spout tasks should be equal to number of shards in kinesis. However each task 
+can read from more than one shard.
+
+```java
+public class KinesisSpoutTopology {
+    public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
+        String topologyName = args[0];
+        RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
+        KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2,
+                1000);
+        org.apache.storm.kinesis.spout.Config config = new org.apache.storm.kinesis.spout.Config(args[1], ShardIteratorType.TRIM_HORIZON,
+                recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), new ZkInfo(), kinesisConnectionInfo, 10000L);
+        KinesisSpout kinesisSpout = new KinesisSpout(config);
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        topologyBuilder.setSpout("spout", kinesisSpout, 3);
+        topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");
+        Config topologyConfig = new Config();
+        topologyConfig.setDebug(true);
+        topologyConfig.setNumWorkers(3);
+        StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology());
+    }
+}
+```
+As you can see above the spout takes an object of Config in its constructor. The constructor of Config takes 8 objects as explained below.
+
+#### `String` streamName
+name of kinesis stream to consume data from
+
+#### `ShardIteratorType` shardIteratorType
+3 types are supported - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. By default this argument is ignored if state for shards 
+is found in zookeeper. Hence they will apply the first time a topology is started. If you want to use any of these in subsequent runs of the topology, you 
+will need to clear the state of zookeeper node used for storing sequence numbers
+#### `RecordToTupleMapper` recordToTupleMapper
+an implementation of `RecordToTupleMapper` interface that spout will call to convert a kinesis record to a storm tuple. It has two methods. getOutputFields 
+tells the spout the fields that will be present in the tuple emitted from the getTuple method. If getTuple returns null, the record will be acked
+```java
+    Fields getOutputFields ();
+    List<Object> getTuple (Record record);
+```
+
+#### `Date` timestamp
+used in conjunction with the AT_TIMESTAMP shardIteratorType argument. This will make the spout fetch records from kinesis starting at that time or later. The
+time used by kinesis is the server side time associated to the record by kinesis
+
+#### `FailedMessageRetryHadnler` failedMessageRetryHandler 
+an implementation of the `FailedMessageRetryHandler` interface. By default this module provides an implementation that supports a exponential backoff retry
+mechanism for failed messages. That implementation has two constructors. Default no args constructor will configure first retry at 100 milliseconds and 
+subsequent retires at Math.pow(2, i-1) where i is the retry number in the range 2 to LONG.MAX_LONG. 2 represents the base for exponential function in seconds. 
+Other constructor takes retry interval in millis for first retry as first argument, base for exponential function in seconds as second argument and number of 
+retries as third argument. The methods of this interface and its working in accord with the spout is explained below
+```java
+    boolean failed (KinesisMessageId messageId);
+    KinesisMessageId getNextFailedMessageToRetry ();
+    void failedMessageEmitted (KinesisMessageId messageId);
+    void acked (KinesisMessageId messageId);
+```
+failed method will be called on every tuple that failed in the spout. It should return true if that failed message is scheduled to be retried, false otherwise.
+
+getNextFailedMessageToRetry method will be called the first thing every time a spout wants to emit a tuple. It should return a message that should be retried
+if any or null otherwise. Note that it can return null in the case it does not have any message to retry as of that moment. However, it should eventually 
+return every message for which it returned true when failed method was called for that message
+
+failedMessageEmitted will be called if spout successfully manages to get the record from kinesis and emit it. If not, the implementation should return the same 
+message when getNextFailedMessageToRetry is called again
+
+acked will be called once the failed message was re-emitted and successfully acked by the spout. If it was failed by the spout failed will be called again
+
+#### `ZkInfo` zkInfo
+an object encapsulating information for zookeeper interaction. It has two constructors. Default no args constructor takes zkUrl as first argument which 
+is a comma separated string of zk host and port, zkNode as second that will be used as the root node for storing committed sequence numbers, session timeout
+as third in milliseconds, connection timeout as fourth in milliseconds, commit interval as fifth in milliseconds for committing sequence numbers to zookeeper, 
+retry attempts as sixth for zk client connection retry attempts, retry interval as seventh in milliseconds for time to wait before retrying to connect. Default 
+constructor uses the values ["localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000]
+
+#### `KinesisConnectionInfo` kinesisConnectionInfo
+an object that captures arguments for connecting to kinesis using kinesis client. It has a constructor that takes an implementation of `AWSCredentialsProvider`
+as first argument. This module provides an implementation called `CredentialsProviderChain` that allows the spout to authenticate with kinesis using one of 
+the 5 mechanisms in this order - `EnvironmentVariableCredentialsProvider`, `SystemPropertiesCredentialsProvider`, `ClasspathPropertiesFileCredentialsProvider`, 
+`InstanceProfileCredentialsProvider`, `ProfileCredentialsProvider`. It takes an object of `ClientConfiguration` as second argument for configuring the kinesis 
+client, `Regions` as third argument that sets the region to connect to on the client and recordsLimit as the fourth argument which represents the maximum number
+of records kinesis client will retrieve for every GetRecords request. This limit should be carefully chosen based on the size of the record, kinesis 
+throughput rate limits and per tuple latency in storm for the topology. Also if one task will be reading from more than one shards then that will also affect
+the choice of limit argument
+
+#### `Long` maxUncommittedRecords
+this represents the maximum number of uncommitted sequence numbers allowed per task. Once this number is reached spout will not fetch any new records from 
+kinesis. Uncommited sequence numbers are defined as the sum of all the messages for a task that have not been committed to zookeeper. This is different from 
+topology level max pending messages. For example if this value is set to 10, and the spout emitted sequence numbers from 1 to 10. Sequence number 1 is pending 
+and 2 to 10 acked. In that case the number of uncommitted sequence numbers is 10 since no sequence number in the range 1 to 10 can be committed to zk. 
+However, storm can still call next tuple on the spout because there is only 1 pending message
+ 
+### Maven dependencies
+Aws sdk version that this was tested with is 1.10.77
+
+```xml
+ <dependencies>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+            <version>${aws-java-sdk.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+        </dependency>
+ </dependencies>
+```
+
+#Future Work
+Handle merging or splitting of shards in kinesis, Trident spout implementation and metrics
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/pom.xml b/external/storm-kinesis/pom.xml
new file mode 100644
index 0000000..f1872dc
--- /dev/null
+++ b/external/storm-kinesis/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+     http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>storm-kinesis</artifactId>
+    <name>storm-kinesis</name>
+
+    <properties>
+        <aws-java-sdk.version>1.10.77</aws-java-sdk.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+            <version>${aws-java-sdk.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
+

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java
new file mode 100644
index 0000000..5be8de9
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Date;
+
+public class Config implements Serializable {
+    // kinesis stream name to read from
+    private final String streamName;
+    // shard iterator type based on kinesis api - beginning of time, latest, at timestamp are only supported
+    private final ShardIteratorType shardIteratorType;
+    // implementation for converting a Kinesis record to a storm tuple
+    private final RecordToTupleMapper recordToTupleMapper;
+    // timestamp to be used for shardIteratorType AT_TIMESTAMP - can be null
+    private final Date timestamp;
+    // implementation for handling the failed messages retry logic
+    private final FailedMessageRetryHandler failedMessageRetryHandler;
+    // object capturing all zk related information for storing committed sequence numbers
+    private final ZkInfo zkInfo;
+    // object representing information on paramaters to use while connecting to kinesis using kinesis client
+    private final KinesisConnectionInfo kinesisConnectionInfo;
+    // this number represents the number of messages that are still not committed to zk. it will prevent the spout from emitting further.
+    // for e.g. if 1 failed and 2,3,4,5..... all have been acked by storm, they still cant be committed to zk because 1 is still in failed set. As a result
+    // the acked queue can infinitely grow without any of them being committed to zk. topology max pending does not help since from storm's view they are acked
+    private final Long maxUncommittedRecords;
+
+    public Config (String streamName, ShardIteratorType shardIteratorType, RecordToTupleMapper recordToTupleMapper, Date timestamp, FailedMessageRetryHandler
+            failedMessageRetryHandler, ZkInfo zkInfo, KinesisConnectionInfo kinesisConnectionInfo, Long maxUncommittedRecords) {
+        this.streamName = streamName;
+        this.shardIteratorType = shardIteratorType;
+        this.recordToTupleMapper = recordToTupleMapper;
+        this.timestamp = timestamp;
+        this.failedMessageRetryHandler = failedMessageRetryHandler;
+        this.zkInfo = zkInfo;
+        this.kinesisConnectionInfo = kinesisConnectionInfo;
+        this.maxUncommittedRecords = maxUncommittedRecords;
+        validate();
+    }
+
+    private void validate () {
+        if (streamName == null || streamName.length() < 1) {
+            throw new IllegalArgumentException("streamName is required and cannot be of length 0.");
+        }
+        if (shardIteratorType == null || shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType
+                .AT_SEQUENCE_NUMBER)) {
+            throw new IllegalArgumentException("shardIteratorType has to be one of the " + ShardIteratorType.AT_TIMESTAMP + "," + ShardIteratorType.LATEST +
+                    "," + ShardIteratorType.TRIM_HORIZON);
+        }
+        if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP) && timestamp == null) {
+            throw new IllegalArgumentException("timestamp must be provided if shardIteratorType is " + ShardIteratorType.AT_TIMESTAMP);
+        }
+        if (recordToTupleMapper == null) {
+            throw new IllegalArgumentException("recordToTupleMapper cannot be null");
+        }
+        if (failedMessageRetryHandler == null) {
+            throw new IllegalArgumentException("failedMessageRetryHandler cannot be null");
+        }
+        if (zkInfo == null) {
+            throw new IllegalArgumentException("zkInfo cannot be null");
+        }
+        if (kinesisConnectionInfo == null) {
+            throw new IllegalArgumentException("kinesisConnectionInfo cannot be null");
+        }
+        if (maxUncommittedRecords == null || maxUncommittedRecords < 1) {
+            throw new IllegalArgumentException("maxUncommittedRecords has to be a positive integer");
+        }
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    public ShardIteratorType getShardIteratorType() {
+        return shardIteratorType;
+    }
+
+    public RecordToTupleMapper getRecordToTupleMapper() {
+        return recordToTupleMapper;
+    }
+
+    public Date getTimestamp() {
+        return timestamp;
+    }
+
+    public FailedMessageRetryHandler getFailedMessageRetryHandler () {
+        return failedMessageRetryHandler;
+    }
+
+    public ZkInfo getZkInfo () {
+        return zkInfo;
+    }
+
+    public KinesisConnectionInfo getKinesisConnectionInfo () {
+        return kinesisConnectionInfo;
+    }
+
+    public Long getMaxUncommittedRecords () {
+        return maxUncommittedRecords;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Config config = (Config) o;
+
+        if (streamName != null ? !streamName.equals(config.streamName) : config.streamName != null) return false;
+        if (shardIteratorType != config.shardIteratorType) return false;
+        if (recordToTupleMapper != null ? !recordToTupleMapper.equals(config.recordToTupleMapper) : config.recordToTupleMapper != null) return false;
+        if (timestamp != null ? !timestamp.equals(config.timestamp) : config.timestamp != null) return false;
+        if (zkInfo != null ? !zkInfo.equals(config.zkInfo) : config.zkInfo != null) return false;
+        if (kinesisConnectionInfo != null ? !kinesisConnectionInfo.equals(config.kinesisConnectionInfo) : config.kinesisConnectionInfo != null) return false;
+        if (maxUncommittedRecords != null ? !maxUncommittedRecords.equals(config.maxUncommittedRecords) : config.maxUncommittedRecords != null) return false;
+        return !(failedMessageRetryHandler != null ? !failedMessageRetryHandler.equals(config.failedMessageRetryHandler) : config.failedMessageRetryHandler
+                != null);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = streamName != null ? streamName.hashCode() : 0;
+        result = 31 * result + (shardIteratorType != null ? shardIteratorType.hashCode() : 0);
+        result = 31 * result + (recordToTupleMapper != null ? recordToTupleMapper.hashCode() : 0);
+        result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0);
+        result = 31 * result + (zkInfo != null ? zkInfo.hashCode() : 0);
+        result = 31 * result + (kinesisConnectionInfo != null ? kinesisConnectionInfo.hashCode() : 0);
+        result = 31 * result + (failedMessageRetryHandler != null ? failedMessageRetryHandler.hashCode() : 0);
+        result = 31 * result + (maxUncommittedRecords != null ? maxUncommittedRecords.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "Config{" +
+                "streamName='" + streamName + '\'' +
+                ", shardIteratorType=" + shardIteratorType +
+                ", recordToTupleMapper=" + recordToTupleMapper +
+                ", timestamp=" + timestamp +
+                ", zkInfo=" + zkInfo +
+                ", kinesisConnectionInfo=" + kinesisConnectionInfo +
+                ", failedMessageRetryHandler =" + failedMessageRetryHandler +
+                ", maxUncommittedRecords=" + maxUncommittedRecords +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
new file mode 100644
index 0000000..4287ae0
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.auth.AWSCredentialsProviderChain;
+import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.InstanceProfileCredentialsProvider;
+import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+
+/**
+ * Class representing chain of mechanisms that will be used in order to connect to kinesis
+ */
+public class CredentialsProviderChain extends AWSCredentialsProviderChain {
+    public CredentialsProviderChain () {
+        super(new EnvironmentVariableCredentialsProvider(),
+                new SystemPropertiesCredentialsProvider(),
+                new ClasspathPropertiesFileCredentialsProvider(),
+                new InstanceProfileCredentialsProvider(),
+                new ProfileCredentialsProvider());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
new file mode 100644
index 0000000..f357f30
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
+    // Wait interfal for retrying after first failure
+    private final Long initialDelayMillis;
+    // Base for exponential function in seconds for retrying for second, third and so on failures
+    private final Long baseSeconds;
+    // Maximum number of retries
+    private final Long maxRetries;
+    // map to track number of failures for each kinesis message that failed
+    private Map<KinesisMessageId, Long> failCounts = new HashMap<>();
+    // map to track next retry time for each kinesis message that failed
+    private Map<KinesisMessageId, Long> retryTimes = new HashMap<>();
+    // sorted set of records to be retrued based on retry time. earliest retryTime record comes first
+    private SortedSet<KinesisMessageId> retryMessageSet = new TreeSet<>(new RetryTimeComparator());
+
+    /**
+     * no args constructor that uses defaults of 100 ms for first retry, max retries of Long.MAX_VALUE and an exponential backoff of Math.pow(2,i-1) secs for
+     * retry i where i = 2,3,
+     */
+    public ExponentialBackoffRetrier () {
+        this(100L, 2L, Long.MAX_VALUE);
+    }
+
+    /**
+     *
+     * @param initialDelayMillis delay in milliseconds for first retry
+     * @param baseSeconds base for exponent function in seconds
+     * @param maxRetries maximum number of retries before the record is discarded/acked
+     */
+    public ExponentialBackoffRetrier (Long initialDelayMillis, Long baseSeconds, Long maxRetries) {
+        this.initialDelayMillis = initialDelayMillis;
+        this.baseSeconds = baseSeconds;
+        this.maxRetries = maxRetries;
+        validate();
+    }
+
+    private void validate () {
+        if (initialDelayMillis < 0) {
+            throw new IllegalArgumentException("initialDelayMillis cannot be negative." );
+        }
+        if (baseSeconds < 0) {
+            throw new IllegalArgumentException("baseSeconds cannot be negative.");
+        }
+        if (maxRetries < 0) {
+            throw new IllegalArgumentException("maxRetries cannot be negative.");
+        }
+    }
+    @Override
+    public boolean failed(KinesisMessageId messageId) {
+        LOG.debug("Handling failed message " + messageId);
+        // if maxRetries is 0, dont retry and return false as per interface contract
+        if (maxRetries == 0) {
+            LOG.debug("maxRetries set to 0. Hence not queueing " + messageId);
+            return false;
+        }
+        // if first failure add it to the count map
+        if (!failCounts.containsKey(messageId)) {
+            failCounts.put(messageId, 0L);
+        }
+        // increment the fail count as we started with 0
+        Long failCount = failCounts.get(messageId);
+        failCounts.put(messageId, ++failCount);
+        // if fail count is greater than maxRetries, discard or ack. for e.g. for maxRetries 3, 4 failures are allowed at maximum
+        if (failCount > maxRetries) {
+            LOG.debug("maxRetries reached so dropping " + messageId);
+            failCounts.remove(messageId);
+            return false;
+        }
+        // if reached so far, add it to the set of messages waiting to be retried with next retry time based on how many times it failed
+        retryTimes.put(messageId, getRetryTime(failCount));
+        retryMessageSet.add(messageId);
+        LOG.debug("Scheduled " + messageId + " for retry at " + retryTimes.get(messageId) + " and retry attempt " + failCount);
+        return true;
+    }
+
+    @Override
+    public void acked(KinesisMessageId messageId) {
+        // message was acked after being retried. so clear the state for that message
+        LOG.debug("Ack received for " + messageId + ". Hence cleaning state.");
+        failCounts.remove(messageId);
+    }
+
+    @Override
+    public KinesisMessageId getNextFailedMessageToRetry() {
+        KinesisMessageId result = null;
+        // return the first message to be retried from the set. It will return the message with the earliest retry time <= current time
+        if (!retryMessageSet.isEmpty() ) {
+            result = retryMessageSet.first();
+            if (!(retryTimes.get(result) <= System.nanoTime())) {
+                result = null;
+            }
+        }
+        LOG.debug("Returning " + result + " to spout for retrying.");
+        return result;
+    }
+
+    @Override
+    public void failedMessageEmitted(KinesisMessageId messageId) {
+        // spout notified that message returned by us for retrying was actually emitted. hence remove it from set and wait for its ack or fail
+        // but still keep it in counts map to retry again on failure or remove on ack
+        LOG.debug("Spout says " + messageId + " emitted. Hence removing it from queue and wait for its ack or fail");
+        retryMessageSet.remove(messageId);
+        retryTimes.remove(messageId);
+    }
+
+    // private helper method to get next retry time for retry attempt i (handles long overflow as well by capping it to Long.MAX_VALUE)
+    private Long getRetryTime (Long retryNum) {
+        Long retryTime = System.nanoTime();
+        Long nanoMultiplierForMillis = 1000000L;
+        // if first retry then retry time  = current time  + initial delay
+        if (retryNum == 1) {
+            retryTime += initialDelayMillis * nanoMultiplierForMillis;
+        } else {
+            // else use the exponential backoff logic and handle long overflow
+            Long maxValue = Long.MAX_VALUE;
+            double time = Math.pow(baseSeconds, retryNum - 1) * 1000 * nanoMultiplierForMillis;
+            // if delay or delay + current time are bigger than long max value
+            // second predicate for or condition uses the fact that long addition over the limit circles back
+            if ((time >= maxValue.doubleValue()) || ((retryTime + (long) time) < retryTime)) {
+                retryTime = maxValue;
+            } else {
+                retryTime += (long) time;
+            }
+        }
+        return retryTime;
+    }
+
+    private class RetryTimeComparator implements Serializable, Comparator<KinesisMessageId> {
+        @Override
+        public int compare(KinesisMessageId o1, KinesisMessageId o2) {
+            return retryTimes.get(o1).compareTo(retryTimes.get(o2));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
new file mode 100644
index 0000000..bb0e450
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import java.io.Serializable;
+
+public interface FailedMessageRetryHandler extends Serializable {
+    /**
+     * message with messageId failed in the spout
+     * @param messageId
+     * @return true if this failed message was scheduled to be retried, false otherwise
+     */
+    boolean failed (KinesisMessageId messageId);
+
+    /**
+     * message with messageId succeeded/acked in the spout
+     * @param messageId
+     */
+    void acked (KinesisMessageId messageId);
+
+    /**
+     * Get the next failed message's id to retry if any, null otherwise
+     * @return messageId
+     */
+    KinesisMessageId getNextFailedMessageToRetry ();
+
+    /**
+     * message with messageId returned by last call to getNextFailedMessageToRetry was emitted/retried by the spout
+     * @param messageId
+     */
+    void failedMessageEmitted (KinesisMessageId messageId);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
new file mode 100644
index 0000000..5d9454a
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class KinesisConnectionInfo implements Serializable {
+    private final byte[] serializedKinesisCredsProvider;
+    private final byte[] serializedkinesisClientConfig;
+    private final Integer recordsLimit;
+    private final Regions region;
+
+    private transient AWSCredentialsProvider credentialsProvider;
+    private transient ClientConfiguration clientConfiguration;
+
+    /**
+     *
+     * @param credentialsProvider implementation to provide credentials to connect to kinesis
+     * @param clientConfiguration client configuration to pass to kinesis client
+     * @param region region to connect to
+     * @param recordsLimit max records to be fetched in a getRecords request to kinesis
+     */
+    public KinesisConnectionInfo (AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, Regions region, Integer recordsLimit) {
+        if (recordsLimit == null || recordsLimit <= 0) {
+            throw new IllegalArgumentException("recordsLimit has to be a positive integer");
+        }
+        if (region == null) {
+            throw new IllegalArgumentException("region cannot be null");
+        }
+        serializedKinesisCredsProvider = getKryoSerializedBytes(credentialsProvider);
+        serializedkinesisClientConfig = getKryoSerializedBytes(clientConfiguration);
+        this.recordsLimit = recordsLimit;
+        this.region = region;
+
+        this.credentialsProvider = null;
+        this.clientConfiguration = null;
+    }
+
+    public Integer getRecordsLimit() {
+        return recordsLimit;
+    }
+
+    public AWSCredentialsProvider getCredentialsProvider() {
+        if (credentialsProvider == null) {
+            credentialsProvider = (AWSCredentialsProvider) this.getKryoDeserializedObject(serializedKinesisCredsProvider);
+        }
+        return credentialsProvider;
+    }
+
+    public ClientConfiguration getClientConfiguration() {
+        if (clientConfiguration == null) {
+            clientConfiguration = (ClientConfiguration) this.getKryoDeserializedObject(serializedkinesisClientConfig);
+        }
+        return clientConfiguration;
+    }
+
+    public Regions getRegion() {
+        return region;
+    }
+
+    private byte[] getKryoSerializedBytes (final Object obj) {
+        final Kryo kryo = new Kryo();
+        final ByteArrayOutputStream os = new ByteArrayOutputStream();
+        final Output output = new Output(os);
+        kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+        kryo.writeClassAndObject(output, obj);
+        output.flush();
+        return os.toByteArray();
+    }
+
+    private Object getKryoDeserializedObject (final byte[] ser) {
+        final Kryo kryo = new Kryo();
+        final Input input = new Input(new ByteArrayInputStream(ser));
+        kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+        return kryo.readClassAndObject(input);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        KinesisConnectionInfo that = (KinesisConnectionInfo) o;
+
+        if (!Arrays.equals(serializedKinesisCredsProvider, that.serializedKinesisCredsProvider)) return false;
+        if (!Arrays.equals(serializedkinesisClientConfig, that.serializedkinesisClientConfig)) return false;
+        if (region != null ? !region.equals(that.region) : that.region != null) return false;
+        return !(recordsLimit != null ? !recordsLimit.equals(that.recordsLimit) : that.recordsLimit != null);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = serializedKinesisCredsProvider != null ? Arrays.hashCode(serializedKinesisCredsProvider) : 0;
+        result = 31 * result + (serializedkinesisClientConfig != null ? Arrays.hashCode(serializedkinesisClientConfig) : 0);
+        result = 31 * result + (region != null ? region.hashCode() : 0);
+        result = 31 * result + (recordsLimit != null ? recordsLimit.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "KinesisConnectionInfo{" +
+                "serializedKinesisCredsProvider=" + Arrays.toString(serializedKinesisCredsProvider) +
+                ", serializedkinesisClientConfig=" + Arrays.toString(serializedkinesisClientConfig) +
+                ", region=" + region +
+                ", recordsLimit=" + recordsLimit +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
new file mode 100644
index 0000000..dd239f1
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+public class KinesisMessageId {
+    private final String streamName;
+    private final String shardId;
+    private final String sequenceNumber;
+
+    KinesisMessageId (String streamName, String shardId, String sequenceNumber) {
+        this.streamName = streamName;
+        this.shardId = shardId;
+        this.sequenceNumber = sequenceNumber;
+    }
+
+    public String getStreamName () {
+        return streamName;
+    }
+
+    public String getShardId () {
+        return shardId;
+    }
+
+    public String getSequenceNumber () {
+        return sequenceNumber;
+    }
+
+    @Override
+    public String toString () {
+        return "KinesisMessageId{" +
+                "streamName='" + streamName + '\'' +
+                ", shardId='" + shardId + '\'' +
+                ", sequenceNumber='" + sequenceNumber + '\'' +
+                '}';
+    }
+
+    @Override
+    public boolean equals (Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        KinesisMessageId that = (KinesisMessageId) o;
+
+        if (streamName != null ? !streamName.equals(that.streamName) : that.streamName != null) return false;
+        if (shardId != null ? !shardId.equals(that.shardId) : that.shardId != null) return false;
+        return !(sequenceNumber != null ? !sequenceNumber.equals(that.sequenceNumber) : that.sequenceNumber != null);
+
+    }
+
+    @Override
+    public int hashCode () {
+        int result = streamName != null ? streamName.hashCode() : 0;
+        result = 31 * result + (shardId != null ? shardId.hashCode() : 0);
+        result = 31 * result + (sequenceNumber != null ? sequenceNumber.hashCode() : 0);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
new file mode 100644
index 0000000..df2f97d
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.zookeeper.CreateMode;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+class KinesisRecordsManager {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
+    // zk interaction object
+    private transient CuratorFramework curatorFramework;
+    // Kinesis Spout Config object
+    private transient final Config config;
+    // Queue of records per shard fetched from kinesis and are waiting to be emitted
+    private transient Map<String, LinkedList<Record>> toEmitPerShard = new HashMap<>();
+    // Map of records  that were fetched from kinesis as a result of failure and are waiting to be emitted
+    private transient Map<KinesisMessageId, Record> failedandFetchedRecords = new HashMap<>();
+    // Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the
+    // sequence number to commit. Logic explained in commit
+    private transient Map<String, TreeSet<BigInteger>> emittedPerShard = new HashMap<>();
+    // sorted acked sequence numbers - needed to figure out what sequence number can be committed
+    private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new HashMap<>();
+    // sorted failed sequence numbers - needed to figure out what sequence number can be committed
+    private transient Map<String, TreeSet<BigInteger>> failedPerShard = new HashMap<>();
+    // shard iterator corresponding to position in shard for new messages
+    private transient Map<String, String> shardIteratorPerShard = new HashMap<>();
+    // last fetched sequence number corresponding to position in shard
+    private transient Map<String, String> fetchedSequenceNumberPerShard = new HashMap<>();
+    // shard iterator corresponding to position in shard for failed messages
+    private transient Map<KinesisMessageId, String> shardIteratorPerFailedMessage = new HashMap<>();
+    // timestamp to decide when to commit to zk again
+    private transient long lastCommitTime;
+    // boolean to track deactivated state
+    private transient boolean deactivated;
+    private transient AmazonKinesisClient kinesisClient;
+
+    KinesisRecordsManager (Config config) {
+        this.config = config;
+    }
+
+    void initialize (int myTaskIndex, int totalTasks) {
+        deactivated = false;
+        lastCommitTime = System.currentTimeMillis();
+        initializeKinesisClient();
+        initializeCurator();
+        List<Shard> shards = this.getShards();
+        LOG.info("myTaskIndex is " + myTaskIndex);
+        LOG.info("totalTasks is " + totalTasks);
+        int i = myTaskIndex;
+        while (i < shards.size()) {
+            LOG.info("Shard id " + shards.get(i).getShardId() + " assigned to task " + myTaskIndex);
+            toEmitPerShard.put(shards.get(i).getShardId(), new LinkedList<Record>());
+            i += totalTasks;
+        }
+        initializeFetchedSequenceNumbers();
+        refreshShardIteratorsForNewRecords();
+    }
+
+    void next (SpoutOutputCollector collector) {
+        if (shouldCommit()) {
+            commit();
+        }
+        KinesisMessageId failedMessageId = config.getFailedMessageRetryHandler().getNextFailedMessageToRetry();
+        if (failedMessageId  != null) {
+            // if the retry service returns a message that is not in failed set then ignore it. should never happen
+            BigInteger failedSequenceNumber = new BigInteger(failedMessageId.getSequenceNumber());
+            if (failedPerShard.containsKey(failedMessageId.getShardId()) && failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber)) {
+                if (!failedandFetchedRecords.containsKey(failedMessageId)) {
+                    fetchFailedRecords(failedMessageId);
+                }
+                if (emitFailedRecord(collector, failedMessageId)) {
+                    failedPerShard.get(failedMessageId.getShardId()).remove(failedSequenceNumber);
+                    config.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId);
+                    return;
+                } else {
+                    LOG.debug("failedMessageEmitted not called on retrier for " + failedMessageId + ". This can happen a few times but should not happen " +
+                            "infinitely");
+                }
+            } else {
+                LOG.debug("failedPerShard does not contain " + failedMessageId + ". This should never happen.");
+            }
+        }
+        LOG.debug("No failed record to emit for now. Hence will try to emit new records");
+        // if maximum uncommitted records count has reached, so dont emit any new records and return
+        if (!(getUncommittedRecordsCount() < config.getMaxUncommittedRecords())) {
+            LOG.debug("maximum uncommitted records count has reached. so not emitting any new records and returning");
+            return;
+        }
+        // early return as no shard is assigned - probably because number of executors > number of shards
+        if (toEmitPerShard.isEmpty()) {
+            LOG.debug("No shard is assigned to this task. Hence not emitting any tuple.");
+            return;
+        }
+
+        if (shouldFetchNewRecords()) {
+            fetchNewRecords();
+        }
+        emitNewRecord(collector);
+    }
+
+    void ack (KinesisMessageId kinesisMessageId) {
+        // for an acked message add it to acked set and remove it from emitted and failed
+        String shardId = kinesisMessageId.getShardId();
+        BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
+        LOG.debug("Ack received for shardId: " + shardId + " sequenceNumber: " + sequenceNumber);
+        if (!ackedPerShard.containsKey(shardId)) {
+            ackedPerShard.put(shardId, new TreeSet<BigInteger>());
+        }
+        ackedPerShard.get(shardId).add(sequenceNumber);
+        if (emittedPerShard.containsKey(shardId)) {
+            TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
+            emitted.remove(sequenceNumber);
+        }
+        if (failedPerShard.containsKey(shardId)) {
+            failedPerShard.get(shardId).remove(sequenceNumber);
+        }
+        if (failedandFetchedRecords.containsKey(kinesisMessageId)) {
+            config.getFailedMessageRetryHandler().acked(kinesisMessageId);
+            failedandFetchedRecords.remove(kinesisMessageId);
+        }
+        // keep committing when topology is deactivated since ack and fail keep getting called on deactivated topology
+        if (deactivated) {
+            commit();
+        }
+    }
+
+    void fail (KinesisMessageId kinesisMessageId) {
+        String shardId = kinesisMessageId.getShardId();
+        BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
+        LOG.debug("Fail received for shardId: " + shardId + " sequenceNumber: " + sequenceNumber);
+        // for a failed message add it to failed set if it will be retried, otherwise ack it; remove from emitted either way
+        if (config.getFailedMessageRetryHandler().failed(kinesisMessageId)) {
+            if (!failedPerShard.containsKey(shardId)) {
+                failedPerShard.put(shardId, new TreeSet<BigInteger>());
+            }
+            failedPerShard.get(shardId).add(sequenceNumber);
+            TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
+            emitted.remove(sequenceNumber);
+        } else {
+            ack(kinesisMessageId);
+        }
+        // keep committing when topology is deactivated since ack and fail keep getting called on deactivated topology
+        if (deactivated) {
+            commit();
+        }
+    }
+
+    void commit () {
+        // Logic for deciding what sequence number to ack is find the highest sequence number from acked called X such that there is no sequence number Y in
+        // emitted or failed that satisfies X > Y. For e.g. is acked is 1,3,5. Emitted is 2,4,6 then we can only commit 1 and not 3 because 2 is still pending
+        for (String shardId: toEmitPerShard.keySet()) {
+            if (ackedPerShard.containsKey(shardId)) {
+                BigInteger commitSequenceNumberBound = null;
+                if (failedPerShard.containsKey(shardId) && !failedPerShard.get(shardId).isEmpty()) {
+                    commitSequenceNumberBound = failedPerShard.get(shardId).first();
+                }
+                if (emittedPerShard.containsKey(shardId) && !emittedPerShard.get(shardId).isEmpty()) {
+                    BigInteger smallestEmittedSequenceNumber = emittedPerShard.get(shardId).first();
+                    if (commitSequenceNumberBound == null || (commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1)) {
+                        commitSequenceNumberBound = smallestEmittedSequenceNumber;
+                    }
+                }
+                Iterator<BigInteger> ackedSequenceNumbers = ackedPerShard.get(shardId).iterator();
+                BigInteger ackedSequenceNumberToCommit = null;
+                while (ackedSequenceNumbers.hasNext()) {
+                    BigInteger ackedSequenceNumber = ackedSequenceNumbers.next();
+                    if (commitSequenceNumberBound == null || (commitSequenceNumberBound.compareTo(ackedSequenceNumber) == 1)) {
+                        ackedSequenceNumberToCommit = ackedSequenceNumber;
+                        ackedSequenceNumbers.remove();
+                    } else {
+                        break;
+                    }
+                }
+                if (ackedSequenceNumberToCommit != null) {
+                    Map<Object, Object> state = new HashMap<>();
+                    state.put("committedSequenceNumber", ackedSequenceNumberToCommit.toString());
+                    LOG.debug("Committing sequence number " + ackedSequenceNumberToCommit.toString() + " for shardId " + shardId);
+                    String path = getZkPath(shardId);
+                    commitState(path, state);
+                }
+            }
+        }
+        lastCommitTime = System.currentTimeMillis();
+    }
+
+    void activate () {
+        LOG.info("Activate called");
+        deactivated = false;
+        initializeKinesisClient();
+    }
+
+    void deactivate () {
+        LOG.info("Deactivate called");
+        deactivated = true;
+        commit();
+        shutdownKinesisClient();
+    }
+
+    void close () {
+        commit();
+        shutdownKinesisClient();
+        shutdownCurator();
+    }
+
+    private String getZkPath (String shardId) {
+        String path = "";
+        if (!config.getZkInfo().getZkNode().startsWith("/")) {
+            path += "/";
+        }
+        path += config.getZkInfo().getZkNode();
+        if (!config.getZkInfo().getZkNode().endsWith("/")) {
+            path += "/";
+        }
+        path += (config.getStreamName() + "/" + shardId);
+        return path;
+    }
+
+    private void commitState (String path, Map<Object, Object> state) {
+        byte[] bytes = JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8"));
+        try {
+            if (curatorFramework.checkExists().forPath(path) == null) {
+                curatorFramework.create()
+                        .creatingParentsIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, bytes);
+            } else {
+                curatorFramework.setData().forPath(path, bytes);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Map<Object, Object> readState (String path) {
+        try {
+            Map<Object, Object> state = null;
+            byte[] b = null;
+            if (curatorFramework.checkExists().forPath(path) != null) {
+                b = curatorFramework.getData().forPath(path);
+            }
+            if (b != null) {
+                state = (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
+            }
+            return state;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    // fetch records from kinesis starting at sequence number for message passed as argument. Any other messages fetched and are in the failed queue will also
+    // be kept in memory to avoid going to kinesis again for retry
+    private void fetchFailedRecords (KinesisMessageId kinesisMessageId) {
+        // if shard iterator not present for this message, get it
+        if (!shardIteratorPerFailedMessage.containsKey(kinesisMessageId)) {
+            refreshShardIteratorForFailedRecord(kinesisMessageId);
+        }
+        String shardIterator = shardIteratorPerFailedMessage.get(kinesisMessageId);
+        LOG.debug("Fetching failed records for shard id :" + kinesisMessageId.getShardId() + " at sequence number " + kinesisMessageId.getSequenceNumber() +
+                " using shardIterator " + shardIterator);
+        try {
+            GetRecordsResult getRecordsResult = fetchRecords(shardIterator);
+            if (getRecordsResult != null) {
+                List<Record> records = getRecordsResult.getRecords();
+                LOG.debug("Records size from fetchFailedRecords is " + records.size());
+                // update the shard iterator to next one in case this fetch does not give the message.
+                shardIteratorPerFailedMessage.put(kinesisMessageId, getRecordsResult.getNextShardIterator());
+                if (records.size() == 0) {
+                    LOG.debug("No records returned from kinesis. Hence sleeping for 1 second");
+                    Thread.sleep(1000);
+                } else {
+                    // add all fetched records to the set of failed records if they are present in failed set
+                    for (Record record: records) {
+                        KinesisMessageId current = new KinesisMessageId(kinesisMessageId.getStreamName(), kinesisMessageId.getShardId(), record.getSequenceNumber());
+                        if (failedPerShard.get(kinesisMessageId.getShardId()).contains(new BigInteger(current.getSequenceNumber()))) {
+                            failedandFetchedRecords.put(current, record);
+                            shardIteratorPerFailedMessage.remove(current);
+                        }
+                    }
+                }
+            }
+        } catch (InterruptedException ie) {
+            LOG.debug("Thread interrupted while sleeping", ie);
+        } catch (ExpiredIteratorException ex) {
+            LOG.debug("shardIterator for failedRecord " + kinesisMessageId + " has expired. Refreshing shardIterator");
+            refreshShardIteratorForFailedRecord(kinesisMessageId);
+        } catch (ProvisionedThroughputExceededException pe) {
+            try {
+                LOG.debug("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", pe);
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                LOG.debug("Thread interrupted exception", e);
+            }
+        }
+    }
+
+    private void fetchNewRecords () {
+        for (Map.Entry<String, LinkedList<Record>> entry : toEmitPerShard.entrySet()) {
+            String shardId = entry.getKey();
+            try {
+                String shardIterator = shardIteratorPerShard.get(shardId);
+                LOG.debug("Fetching new records for shard id :" + shardId + " using shardIterator " + shardIterator + " after sequence number " +
+                        fetchedSequenceNumberPerShard.get(shardId));
+                GetRecordsResult getRecordsResult = fetchRecords(shardIterator);
+                if (getRecordsResult != null) {
+                    List<Record> records = getRecordsResult.getRecords();
+                    LOG.debug("Records size from fetchNewRecords is " + records.size());
+                    // update the shard iterator to next one in case this fetch does not give the message.
+                    shardIteratorPerShard.put(shardId, getRecordsResult.getNextShardIterator());
+                    if (records.size() == 0) {
+                        LOG.debug("No records returned from kinesis. Hence sleeping for 1 second");
+                        Thread.sleep(1000);
+                    } else {
+                        entry.getValue().addAll(records);
+                        fetchedSequenceNumberPerShard.put(shardId, records.get(records.size() - 1).getSequenceNumber());
+                    }
+                }
+            } catch (InterruptedException ie) {
+                LOG.debug("Thread interrupted while sleeping", ie);
+            } catch (ExpiredIteratorException ex) {
+                LOG.debug("shardIterator for shardId " + shardId + " has expired. Refreshing shardIterator");
+                refreshShardIteratorForNewRecords(shardId);
+            } catch (ProvisionedThroughputExceededException pe) {
+                try {
+                    LOG.debug("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", pe);
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    LOG.debug("Thread interrupted exception", e);
+                }
+            }
+        }
+    }
+
+    private GetRecordsResult fetchRecords (String shardIterator) {
+        List<Record> records = new ArrayList<>();
+        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
+        getRecordsRequest.setShardIterator(shardIterator);
+        getRecordsRequest.setLimit(config.getKinesisConnectionInfo().getRecordsLimit());
+        GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
+        return getRecordsResult;
+    }
+
+    private List<Shard> getShards () {
+        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
+        describeStreamRequest.setStreamName(config.getStreamName());
+        List<Shard> shards = new ArrayList<>();
+        String exclusiveStartShardId = null;
+        do {
+            describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
+            DescribeStreamResult describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
+            shards.addAll(describeStreamResult.getStreamDescription().getShards());
+            if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
+                exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
+            } else {
+                exclusiveStartShardId = null;
+            }
+        } while ( exclusiveStartShardId != null );
+        LOG.info("Number of shards for stream " + config.getStreamName() + " are " + shards.size());
+        return shards;
+    }
+
+    private void emitNewRecord (SpoutOutputCollector collector) {
+        for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
+            String shardId = entry.getKey();
+            LinkedList<Record> listOfRecords = entry.getValue();
+            Record record;
+            while ((record = listOfRecords.pollFirst()) != null) {
+                KinesisMessageId kinesisMessageId = new KinesisMessageId(config.getStreamName(), shardId, record.getSequenceNumber());
+                if (emitRecord(collector, record, kinesisMessageId)) {
+                   return;
+                }
+            }
+        }
+    }
+
+    private boolean emitFailedRecord (SpoutOutputCollector collector, KinesisMessageId kinesisMessageId) {
+        if (!failedandFetchedRecords.containsKey(kinesisMessageId)) {
+            return false;
+        }
+        return emitRecord(collector, failedandFetchedRecords.get(kinesisMessageId), kinesisMessageId);
+    }
+
+    private boolean emitRecord (SpoutOutputCollector collector, Record record, KinesisMessageId kinesisMessageId) {
+        boolean result = false;
+        List<Object> tuple = config.getRecordToTupleMapper().getTuple(record);
+        // if a record is returned put the sequence number in the emittedPerShard to tie back with ack or fail
+        if (tuple != null && tuple.size() > 0) {
+            collector.emit(tuple, kinesisMessageId);
+            if (!emittedPerShard.containsKey(kinesisMessageId.getShardId())) {
+                emittedPerShard.put(kinesisMessageId.getShardId(), new TreeSet<BigInteger>());
+            }
+            emittedPerShard.get(kinesisMessageId.getShardId()).add(new BigInteger(record.getSequenceNumber()));
+            result = true;
+        } else {
+            // ack to not process the record again on restart and move on to next message
+            LOG.debug("Record " + record + " did not return a tuple to emit. Hence acking it");
+            ack(kinesisMessageId);
+        }
+        return result;
+    }
+
+    private boolean shouldCommit () {
+        return (System.currentTimeMillis() - lastCommitTime >= config.getZkInfo().getCommitIntervalMs());
+    }
+
+    private void initializeFetchedSequenceNumbers () {
+        for (String shardId : toEmitPerShard.keySet()) {
+            Map<Object, Object> state = readState(getZkPath(shardId));
+            // if state found for this shard in zk, then set the sequence number in fetchedSequenceNumber
+            if (state != null) {
+                Object committedSequenceNumber = state.get("committedSequenceNumber");
+                LOG.info("State read is committedSequenceNumber: " + committedSequenceNumber + " shardId:" + shardId);
+                if (committedSequenceNumber != null) {
+                    fetchedSequenceNumberPerShard.put(shardId, (String) committedSequenceNumber);
+                }
+            }
+        }
+    }
+
+    private void refreshShardIteratorsForNewRecords () {
+        for (String shardId: toEmitPerShard.keySet()) {
+            refreshShardIteratorForNewRecords(shardId);
+        }
+    }
+
+    private void refreshShardIteratorForNewRecords (String shardId) {
+        String shardIterator = null;
+        String lastFetchedSequenceNumber = fetchedSequenceNumberPerShard.get(shardId);
+        ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null ? config.getShardIteratorType() : ShardIteratorType
+                .AFTER_SEQUENCE_NUMBER);
+        // Set the shard iterator for last fetched sequence number to start from correct position in shard
+        shardIterator = this.getShardIterator(shardId, shardIteratorType, lastFetchedSequenceNumber, config.getTimestamp());
+        if (shardIterator != null && !shardIterator.isEmpty()) {
+            LOG.debug("Refreshing shard iterator for new records for shardId " + shardId + " with shardIterator " + shardIterator);
+            shardIteratorPerShard.put(shardId, shardIterator);
+        }
+    }
+
+    private void refreshShardIteratorForFailedRecord (KinesisMessageId kinesisMessageId) {
+        String shardIterator = null;
+        // Set the shard iterator for last fetched sequence number to start from correct position in shard
+        shardIterator = this.getShardIterator(kinesisMessageId.getShardId(), ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
+        if (shardIterator != null && !shardIterator.isEmpty()) {
+            LOG.debug("Refreshing shard iterator for failed records for message " + kinesisMessageId + " with shardIterator " + shardIterator);
+            shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator);
+        }
+    }
+
+    private String getShardIterator (String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Date timestamp) {
+        String shardIterator = "";
+        try {
+            GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
+            getShardIteratorRequest.setStreamName(config.getStreamName());
+            getShardIteratorRequest.setShardId(shardId);
+            getShardIteratorRequest.setShardIteratorType(shardIteratorType);
+            if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
+                getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
+            } else if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) {
+                getShardIteratorRequest.setTimestamp(timestamp);
+            }
+            GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
+            if (getShardIteratorResult != null) {
+                shardIterator = getShardIteratorResult.getShardIterator();
+            }
+        } catch (Exception e) {
+            LOG.debug("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " +
+                    sequenceNumber + " timestamp " + timestamp, e);
+        }
+        LOG.debug("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " +
+                sequenceNumber + " timestamp" + timestamp);
+        return shardIterator;
+    }
+
+    private Long getUncommittedRecordsCount () {
+        Long result = 0L;
+        for (Map.Entry<String, TreeSet<BigInteger>> emitted: emittedPerShard.entrySet()) {
+            result += emitted.getValue().size();
+        }
+        for (Map.Entry<String, TreeSet<BigInteger>> acked: ackedPerShard.entrySet()) {
+            result += acked.getValue().size();
+        }
+        for (Map.Entry<String, TreeSet<BigInteger>> failed: failedPerShard.entrySet()) {
+            result += failed.getValue().size();
+        }
+        LOG.debug("Returning uncommittedRecordsCount as " + result);
+        return result;
+    }
+
+    private boolean shouldFetchNewRecords () {
+        // check to see if any shard has already fetched records waiting to be emitted, in which case dont fetch more
+        boolean fetchRecords = true;
+        for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
+            if (!entry.getValue().isEmpty()) {
+                fetchRecords = false;
+                break;
+            }
+        }
+        return fetchRecords;
+    }
+
+    private void initializeCurator () {
+        ZkInfo zkInfo = config.getZkInfo();
+        curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), zkInfo.getSessionTimeoutMs(), zkInfo.getConnectionTimeoutMs(), new
+                RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
+        curatorFramework.start();
+    }
+
+    private void initializeKinesisClient () {
+        kinesisClient = new AmazonKinesisClient(config.getKinesisConnectionInfo().getCredentialsProvider(), config.getKinesisConnectionInfo().getClientConfiguration());
+        kinesisClient.setRegion(Region.getRegion(config.getKinesisConnectionInfo().getRegion()));
+    }
+
+    private void shutdownCurator () {
+        curatorFramework.close();
+    }
+
+    private void shutdownKinesisClient () {
+        kinesisClient.shutdown();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
new file mode 100644
index 0000000..1ead4c0
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class KinesisSpout extends BaseRichSpout {
+
+    private final Config config;
+    private transient KinesisRecordsManager kinesisRecordsManager;
+    private transient SpoutOutputCollector collector;
+
+    public KinesisSpout (Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(config.getRecordToTupleMapper().getOutputFields());
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration () {
+        return super.getComponentConfiguration();
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+        kinesisRecordsManager = new KinesisRecordsManager(config);
+        kinesisRecordsManager.initialize(context.getThisTaskIndex(), context.getComponentTasks(context.getThisComponentId()).size());
+    }
+
+    @Override
+    public void close() {
+        kinesisRecordsManager.close();
+    }
+
+    @Override
+    public void activate() {
+        kinesisRecordsManager.activate();
+    }
+
+    @Override
+    public void deactivate() {
+        kinesisRecordsManager.deactivate();
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        kinesisRecordsManager.ack((KinesisMessageId) msgId);
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        kinesisRecordsManager.fail((KinesisMessageId) msgId);
+    }
+
+    @Override
+    public void nextTuple() {
+        kinesisRecordsManager.next(collector);
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
new file mode 100644
index 0000000..c806539
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.Record;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+
+public interface RecordToTupleMapper {
+    /**
+     *
+     * @return names of fields in the emitted tuple
+     */
+    Fields getOutputFields ();
+
+    /**
+     *
+     * @param record kinesis record
+     * @return storm tuple to be emitted for this record, null if no tuple should be emitted
+     */
+    List<Object> getTuple (Record record);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/de68c267/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
new file mode 100644
index 0000000..17bcd6f
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import java.io.Serializable;
+
+public class ZkInfo implements Serializable {
+    // comma separated list of zk connect strings to connect to zookeeper e.g. localhost:2181
+    private final String zkUrl;
+    // zk node under which to commit the sequence number of messages. e.g. /committed_sequence_numbers
+    private final String zkNode;
+    // zk session timeout in milliseconds
+    private final Integer sessionTimeoutMs;
+    // zk connection timeout in milliseconds
+    private final Integer connectionTimeoutMs;
+    // interval at which to commit offsets to zk in milliseconds
+    private final Long commitIntervalMs;
+    // number of retry attempts for zk
+    private final Integer retryAttempts;
+    // time to sleep between retries in milliseconds
+    private final Integer retryIntervalMs;
+
+    /**
+     * Default constructor that uses defaults for a local setup
+     */
+    public ZkInfo () {
+        this("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000);
+    }
+
+    public ZkInfo (String zkUrl, String zkNode, Integer sessionTimeoutMs, Integer connectionTimeoutMs, Long commitIntervalMs, Integer retryAttempts, Integer
+            retryIntervalMs) {
+        this.zkUrl = zkUrl;
+        this.zkNode = zkNode;
+        this.sessionTimeoutMs = sessionTimeoutMs;
+        this.connectionTimeoutMs = connectionTimeoutMs;
+        this.commitIntervalMs = commitIntervalMs;
+        this.retryAttempts = retryAttempts;
+        this.retryIntervalMs = retryIntervalMs;
+        validate();
+    }
+
+    public String getZkUrl() {
+        return zkUrl;
+    }
+
+    public String getZkNode() {
+        return zkNode;
+    }
+
+    public Integer getSessionTimeoutMs() {
+        return sessionTimeoutMs;
+    }
+
+    public Integer getConnectionTimeoutMs() {
+        return connectionTimeoutMs;
+    }
+
+    public Long getCommitIntervalMs() {
+        return commitIntervalMs;
+    }
+
+    public Integer getRetryAttempts() {
+        return retryAttempts;
+    }
+
+    public Integer getRetryIntervalMs() {
+        return retryIntervalMs;
+    }
+
+    private void validate () {
+
+        if (zkUrl == null || zkUrl.length() < 1) {
+            throw new IllegalArgumentException("zkUrl must be specified to connect to zookeeper");
+        }
+        if (zkNode == null || zkNode.length() < 1) {
+            throw new IllegalArgumentException("zkNode must be specified");
+        }
+        checkPositive(sessionTimeoutMs, "sessionTimeoutMs");
+        checkPositive(connectionTimeoutMs, "connectionTimeoutMs");
+        checkPositive(commitIntervalMs, "commitIntervalMs");
+        checkPositive(retryAttempts, "retryAttempts");
+        checkPositive(retryIntervalMs, "retryIntervalMs");
+    }
+
+    private void checkPositive (Integer argument, String name) {
+        if (argument == null && argument <= 0) {
+            throw new IllegalArgumentException(name + " must be positive");
+        }
+    }
+    private void checkPositive (Long argument, String name) {
+        if (argument == null && argument <= 0) {
+            throw new IllegalArgumentException(name + " must be positive");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ZkInfo{" +
+                "zkUrl='" + zkUrl + '\'' +
+                ", zkNode='" + zkNode + '\'' +
+                ", sessionTimeoutMs=" + sessionTimeoutMs +
+                ", connectionTimeoutMs=" + connectionTimeoutMs +
+                ", commitIntervalMs=" + commitIntervalMs +
+                ", retryAttempts=" + retryAttempts +
+                ", retryIntervalMs=" + retryIntervalMs +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ZkInfo zkInfo = (ZkInfo) o;
+
+        if (zkUrl != null ? !zkUrl.equals(zkInfo.zkUrl) : zkInfo.zkUrl != null) return false;
+        if (zkNode != null ? !zkNode.equals(zkInfo.zkNode) : zkInfo.zkNode != null) return false;
+        if (sessionTimeoutMs != null ? !sessionTimeoutMs.equals(zkInfo.sessionTimeoutMs) : zkInfo.sessionTimeoutMs != null) return false;
+        if (connectionTimeoutMs != null ? !connectionTimeoutMs.equals(zkInfo.connectionTimeoutMs) : zkInfo.connectionTimeoutMs != null) return false;
+        if (commitIntervalMs != null ? !commitIntervalMs.equals(zkInfo.commitIntervalMs) : zkInfo.commitIntervalMs != null) return false;
+        if (retryAttempts != null ? !retryAttempts.equals(zkInfo.retryAttempts) : zkInfo.retryAttempts != null) return false;
+        return !(retryIntervalMs != null ? !retryIntervalMs.equals(zkInfo.retryIntervalMs) : zkInfo.retryIntervalMs != null);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = zkUrl != null ? zkUrl.hashCode() : 0;
+        result = 31 * result + (zkNode != null ? zkNode.hashCode() : 0);
+        result = 31 * result + (sessionTimeoutMs != null ? sessionTimeoutMs.hashCode() : 0);
+        result = 31 * result + (connectionTimeoutMs != null ? connectionTimeoutMs.hashCode() : 0);
+        result = 31 * result + (commitIntervalMs != null ? commitIntervalMs.hashCode() : 0);
+        result = 31 * result + (retryAttempts != null ? retryAttempts.hashCode() : 0);
+        result = 31 * result + (retryIntervalMs != null ? retryIntervalMs.hashCode() : 0);
+        return result;
+    }
+}