You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/08/03 04:58:21 UTC

[4/6] storm git commit: STORM-1839: Move zk and kinesis interaction in to its own class.

STORM-1839: Move zk and kinesis interaction in to its own class.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b59c75c0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b59c75c0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b59c75c0

Branch: refs/heads/master
Commit: b59c75c052ee73203775d9acc2cecb7c691de940
Parents: 49c4747
Author: Priyank <ps...@hortonworks.com>
Authored: Mon Aug 1 12:19:23 2016 -0700
Committer: Priyank <ps...@hortonworks.com>
Committed: Mon Aug 1 12:19:23 2016 -0700

----------------------------------------------------------------------
 .../storm/kinesis/spout/KinesisConnection.java  | 108 +++++++++++
 .../kinesis/spout/KinesisRecordsManager.java    | 187 ++++---------------
 .../storm/kinesis/spout/ZKConnection.java       |  95 ++++++++++
 3 files changed, 238 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b59c75c0/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
new file mode 100644
index 0000000..dfd9049
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+class KinesisConnection {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
+    private final KinesisConnectionInfo kinesisConnectionInfo;
+    private AmazonKinesisClient kinesisClient;
+
+    KinesisConnection (KinesisConnectionInfo kinesisConnectionInfo) {
+        this.kinesisConnectionInfo = kinesisConnectionInfo;
+    }
+
+    void initialize () {
+        kinesisClient = new AmazonKinesisClient(kinesisConnectionInfo.getCredentialsProvider(), kinesisConnectionInfo.getClientConfiguration());
+        kinesisClient.setRegion(Region.getRegion(kinesisConnectionInfo.getRegion()));
+    }
+
+    List<Shard> getShardsForStream (String stream) {
+        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
+        describeStreamRequest.setStreamName(stream);
+        List<Shard> shards = new ArrayList<>();
+        String exclusiveStartShardId = null;
+        do {
+            describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
+            DescribeStreamResult describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
+            shards.addAll(describeStreamResult.getStreamDescription().getShards());
+            if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
+                exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
+            } else {
+                exclusiveStartShardId = null;
+            }
+        } while ( exclusiveStartShardId != null );
+        LOG.info("Number of shards for stream " + stream + " are " + shards.size());
+        return shards;
+    }
+
+    String getShardIterator (String stream, String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Date timestamp) {
+        String shardIterator = "";
+        try {
+            GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
+            getShardIteratorRequest.setStreamName(stream);
+            getShardIteratorRequest.setShardId(shardId);
+            getShardIteratorRequest.setShardIteratorType(shardIteratorType);
+            if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
+                getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
+            } else if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) {
+                getShardIteratorRequest.setTimestamp(timestamp);
+            }
+            GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
+            if (getShardIteratorResult != null) {
+                shardIterator = getShardIteratorResult.getShardIterator();
+            }
+        } catch (Exception e) {
+            LOG.warn("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " +
+                    sequenceNumber + " timestamp " + timestamp, e);
+        }
+        LOG.warn("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " +
+                sequenceNumber + " timestamp" + timestamp);
+        return shardIterator;
+    }
+
+    GetRecordsResult fetchRecords (String shardIterator) {
+        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
+        getRecordsRequest.setShardIterator(shardIterator);
+        getRecordsRequest.setLimit(kinesisConnectionInfo.getRecordsLimit());
+        GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
+        return getRecordsResult;
+    }
+
+    void shutdown () {
+        kinesisClient.shutdown();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b59c75c0/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
index bdd054f..7f3f024 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
@@ -18,32 +18,17 @@
 
 package org.apache.storm.kinesis.spout;
 
-import com.amazonaws.regions.Region;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
-import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
 import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.zookeeper.CreateMode;
-import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -53,8 +38,10 @@ import java.util.TreeSet;
 
 class KinesisRecordsManager {
     private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
-    // zk interaction object
-    private transient CuratorFramework curatorFramework;
+    // object handling zk interaction
+    private transient ZKConnection zkConnection;
+    // object handling interaction with kinesis
+    private transient KinesisConnection kinesisConnection;
     // Kinesis Spout KinesisConfig object
     private transient final KinesisConfig kinesisConfig;
     // Queue of records per shard fetched from kinesis and are waiting to be emitted
@@ -78,18 +65,19 @@ class KinesisRecordsManager {
     private transient long lastCommitTime;
     // boolean to track deactivated state
     private transient boolean deactivated;
-    private transient AmazonKinesisClient kinesisClient;
 
     KinesisRecordsManager (KinesisConfig kinesisConfig) {
         this.kinesisConfig = kinesisConfig;
+        this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
+        this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
     }
 
     void initialize (int myTaskIndex, int totalTasks) {
         deactivated = false;
         lastCommitTime = System.currentTimeMillis();
-        initializeKinesisClient();
-        initializeCurator();
-        List<Shard> shards = this.getShards();
+        kinesisConnection.initialize();
+        zkConnection.initialize();
+        List<Shard> shards = kinesisConnection.getShardsForStream(kinesisConfig.getStreamName());
         LOG.info("myTaskIndex is " + myTaskIndex);
         LOG.info("totalTasks is " + totalTasks);
         int i = myTaskIndex;
@@ -149,17 +137,24 @@ class KinesisRecordsManager {
         String shardId = kinesisMessageId.getShardId();
         BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
         LOG.debug("Ack received for shardId: " + shardId + " sequenceNumber: " + sequenceNumber);
+        // if an ack is received for a message then add it to the ackedPerShard TreeSet. TreeSet because while committing we need to figure out what is the
+        // highest sequence number that can be committed for this shard
         if (!ackedPerShard.containsKey(shardId)) {
             ackedPerShard.put(shardId, new TreeSet<BigInteger>());
         }
         ackedPerShard.get(shardId).add(sequenceNumber);
+        // if the acked message was in emittedPerShard that means we need to remove it from the emittedPerShard(which keeps track of in flight tuples)
         if (emittedPerShard.containsKey(shardId)) {
             TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
             emitted.remove(sequenceNumber);
         }
+        // an acked message should not be in failed since if it fails and gets re-emitted it moves to emittedPerShard from failedPerShard. Defensive coding.
+        // Remove it from failedPerShard anyway
         if (failedPerShard.containsKey(shardId)) {
             failedPerShard.get(shardId).remove(sequenceNumber);
         }
+        // if an ack is for a message that failed once at least and was re-emitted then the record itself will be in failedAndFetchedRecords. We use that to
+        // determine if the FailedMessageRetryHandler needs to be told about it and then remove the record itself to clean up memory
         if (failedandFetchedRecords.containsKey(kinesisMessageId)) {
             kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId);
             failedandFetchedRecords.remove(kinesisMessageId);
@@ -192,8 +187,13 @@ class KinesisRecordsManager {
     }
 
     void commit () {
-        // Logic for deciding what sequence number to ack is find the highest sequence number from acked called X such that there is no sequence number Y in
-        // emitted or failed that satisfies X > Y. For e.g. is acked is 1,3,5. Emitted is 2,4,6 then we can only commit 1 and not 3 because 2 is still pending
+        // We have three mutually disjoint treesets per shard at any given time to keep track of what sequence number can be committed to zookeeper.
+        // emittedPerShard, ackedPerShard and failedPerShard. Any record starts by entering emittedPerShard. On ack it moves from emittedPerShard to
+        // ackedPerShard and on fail if retry service tells us to retry then it moves from emittedPerShard to failedPerShard. The failed records will move from
+        // failedPerShard to emittedPerShard when the failed record is emitted again as a retry.
+        // Logic for deciding what sequence number to commit is find the highest sequence number from ackedPerShard called X such that there is no sequence
+        // number Y in emittedPerShard or failedPerShard that satisfies X > Y. For e.g. if ackedPerShard is 1,4,5, emittedPerShard is 2,6 and
+        // failedPerShard is 3,7 then we can only commit 1 and not 4 because 2 is still pending and 3 has failed
         for (String shardId: toEmitPerShard.keySet()) {
             if (ackedPerShard.containsKey(shardId)) {
                 BigInteger commitSequenceNumberBound = null;
@@ -221,8 +221,7 @@ class KinesisRecordsManager {
                     Map<Object, Object> state = new HashMap<>();
                     state.put("committedSequenceNumber", ackedSequenceNumberToCommit.toString());
                     LOG.debug("Committing sequence number " + ackedSequenceNumberToCommit.toString() + " for shardId " + shardId);
-                    String path = getZkPath(shardId);
-                    commitState(path, state);
+                    zkConnection.commitState(kinesisConfig.getStreamName(), shardId, state);
                 }
             }
         }
@@ -232,65 +231,20 @@ class KinesisRecordsManager {
     void activate () {
         LOG.info("Activate called");
         deactivated = false;
-        initializeKinesisClient();
+        kinesisConnection.initialize();
     }
 
     void deactivate () {
         LOG.info("Deactivate called");
         deactivated = true;
         commit();
-        shutdownKinesisClient();
+        kinesisConnection.shutdown();
     }
 
     void close () {
         commit();
-        shutdownKinesisClient();
-        shutdownCurator();
-    }
-
-    private String getZkPath (String shardId) {
-        String path = "";
-        if (!kinesisConfig.getZkInfo().getZkNode().startsWith("/")) {
-            path += "/";
-        }
-        path += kinesisConfig.getZkInfo().getZkNode();
-        if (!kinesisConfig.getZkInfo().getZkNode().endsWith("/")) {
-            path += "/";
-        }
-        path += (kinesisConfig.getStreamName() + "/" + shardId);
-        return path;
-    }
-
-    private void commitState (String path, Map<Object, Object> state) {
-        byte[] bytes = JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8"));
-        try {
-            if (curatorFramework.checkExists().forPath(path) == null) {
-                curatorFramework.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path, bytes);
-            } else {
-                curatorFramework.setData().forPath(path, bytes);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private Map<Object, Object> readState (String path) {
-        try {
-            Map<Object, Object> state = null;
-            byte[] b = null;
-            if (curatorFramework.checkExists().forPath(path) != null) {
-                b = curatorFramework.getData().forPath(path);
-            }
-            if (b != null) {
-                state = (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
-            }
-            return state;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+        kinesisConnection.shutdown();
+        zkConnection.shutdown();
     }
 
     // fetch records from kinesis starting at sequence number for message passed as argument. Any other messages fetched and are in the failed queue will also
@@ -304,7 +258,7 @@ class KinesisRecordsManager {
         LOG.debug("Fetching failed records for shard id :" + kinesisMessageId.getShardId() + " at sequence number " + kinesisMessageId.getSequenceNumber() +
                 " using shardIterator " + shardIterator);
         try {
-            GetRecordsResult getRecordsResult = fetchRecords(shardIterator);
+            GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator);
             if (getRecordsResult != null) {
                 List<Record> records = getRecordsResult.getRecords();
                 LOG.debug("Records size from fetchFailedRecords is " + records.size());
@@ -346,7 +300,7 @@ class KinesisRecordsManager {
                 String shardIterator = shardIteratorPerShard.get(shardId);
                 LOG.debug("Fetching new records for shard id :" + shardId + " using shardIterator " + shardIterator + " after sequence number " +
                         fetchedSequenceNumberPerShard.get(shardId));
-                GetRecordsResult getRecordsResult = fetchRecords(shardIterator);
+                GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator);
                 if (getRecordsResult != null) {
                     List<Record> records = getRecordsResult.getRecords();
                     LOG.debug("Records size from fetchNewRecords is " + records.size());
@@ -376,34 +330,6 @@ class KinesisRecordsManager {
         }
     }
 
-    private GetRecordsResult fetchRecords (String shardIterator) {
-        List<Record> records = new ArrayList<>();
-        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
-        getRecordsRequest.setShardIterator(shardIterator);
-        getRecordsRequest.setLimit(kinesisConfig.getKinesisConnectionInfo().getRecordsLimit());
-        GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
-        return getRecordsResult;
-    }
-
-    private List<Shard> getShards () {
-        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
-        describeStreamRequest.setStreamName(kinesisConfig.getStreamName());
-        List<Shard> shards = new ArrayList<>();
-        String exclusiveStartShardId = null;
-        do {
-            describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
-            DescribeStreamResult describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
-            shards.addAll(describeStreamResult.getStreamDescription().getShards());
-            if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
-                exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
-            } else {
-                exclusiveStartShardId = null;
-            }
-        } while ( exclusiveStartShardId != null );
-        LOG.info("Number of shards for stream " + kinesisConfig.getStreamName() + " are " + shards.size());
-        return shards;
-    }
-
     private void emitNewRecord (SpoutOutputCollector collector) {
         for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
             String shardId = entry.getKey();
@@ -450,7 +376,7 @@ class KinesisRecordsManager {
 
     private void initializeFetchedSequenceNumbers () {
         for (String shardId : toEmitPerShard.keySet()) {
-            Map<Object, Object> state = readState(getZkPath(shardId));
+            Map<Object, Object> state = zkConnection.readState(kinesisConfig.getStreamName(), shardId);
             // if state found for this shard in zk, then set the sequence number in fetchedSequenceNumber
             if (state != null) {
                 Object committedSequenceNumber = state.get("committedSequenceNumber");
@@ -474,7 +400,8 @@ class KinesisRecordsManager {
         ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null ? kinesisConfig.getShardIteratorType() : ShardIteratorType
                 .AFTER_SEQUENCE_NUMBER);
         // Set the shard iterator for last fetched sequence number to start from correct position in shard
-        shardIterator = this.getShardIterator(shardId, shardIteratorType, lastFetchedSequenceNumber, kinesisConfig.getTimestamp());
+        shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), shardId, shardIteratorType, lastFetchedSequenceNumber, kinesisConfig
+                .getTimestamp());
         if (shardIterator != null && !shardIterator.isEmpty()) {
             LOG.warn("Refreshing shard iterator for new records for shardId " + shardId + " with shardIterator " + shardIterator);
             shardIteratorPerShard.put(shardId, shardIterator);
@@ -484,38 +411,14 @@ class KinesisRecordsManager {
     private void refreshShardIteratorForFailedRecord (KinesisMessageId kinesisMessageId) {
         String shardIterator = null;
         // Set the shard iterator for last fetched sequence number to start from correct position in shard
-        shardIterator = this.getShardIterator(kinesisMessageId.getShardId(), ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
+        shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), kinesisMessageId.getShardId(), ShardIteratorType
+                .AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
         if (shardIterator != null && !shardIterator.isEmpty()) {
             LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId + " with shardIterator " + shardIterator);
             shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator);
         }
     }
 
-    private String getShardIterator (String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Date timestamp) {
-        String shardIterator = "";
-        try {
-            GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
-            getShardIteratorRequest.setStreamName(kinesisConfig.getStreamName());
-            getShardIteratorRequest.setShardId(shardId);
-            getShardIteratorRequest.setShardIteratorType(shardIteratorType);
-            if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
-                getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
-            } else if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) {
-                getShardIteratorRequest.setTimestamp(timestamp);
-            }
-            GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
-            if (getShardIteratorResult != null) {
-                shardIterator = getShardIteratorResult.getShardIterator();
-            }
-        } catch (Exception e) {
-            LOG.warn("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " +
-                    sequenceNumber + " timestamp " + timestamp, e);
-        }
-        LOG.warn("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " +
-                sequenceNumber + " timestamp" + timestamp);
-        return shardIterator;
-    }
-
     private Long getUncommittedRecordsCount () {
         Long result = 0L;
         for (Map.Entry<String, TreeSet<BigInteger>> emitted: emittedPerShard.entrySet()) {
@@ -543,24 +446,4 @@ class KinesisRecordsManager {
         return fetchRecords;
     }
 
-    private void initializeCurator () {
-        ZkInfo zkInfo = kinesisConfig.getZkInfo();
-        curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), zkInfo.getSessionTimeoutMs(), zkInfo.getConnectionTimeoutMs(), new
-                RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
-        curatorFramework.start();
-    }
-
-    private void initializeKinesisClient () {
-        kinesisClient = new AmazonKinesisClient(kinesisConfig.getKinesisConnectionInfo().getCredentialsProvider(), kinesisConfig.getKinesisConnectionInfo().getClientConfiguration());
-        kinesisClient.setRegion(Region.getRegion(kinesisConfig.getKinesisConnectionInfo().getRegion()));
-    }
-
-    private void shutdownCurator () {
-        curatorFramework.close();
-    }
-
-    private void shutdownKinesisClient () {
-        kinesisClient.shutdown();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b59c75c0/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
new file mode 100644
index 0000000..41151d1
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.zookeeper.CreateMode;
+import org.json.simple.JSONValue;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+class ZKConnection {
+
+    private final ZkInfo zkInfo;
+    private CuratorFramework curatorFramework;
+
+    ZKConnection (ZkInfo zkInfo) {
+        this.zkInfo = zkInfo;
+    }
+
+    void initialize () {
+        curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), zkInfo.getSessionTimeoutMs(), zkInfo.getConnectionTimeoutMs(), new
+                RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
+        curatorFramework.start();
+    }
+
+    void commitState (String stream, String shardId, Map<Object, Object> state) {
+        byte[] bytes = JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8"));
+        try {
+            String path = getZkPath(stream, shardId);
+            if (curatorFramework.checkExists().forPath(path) == null) {
+                curatorFramework.create()
+                        .creatingParentsIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(path, bytes);
+            } else {
+                curatorFramework.setData().forPath(path, bytes);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    Map<Object, Object> readState (String stream, String shardId) {
+        try {
+            String path = getZkPath(stream, shardId);
+            Map<Object, Object> state = null;
+            byte[] b = null;
+            if (curatorFramework.checkExists().forPath(path) != null) {
+                b = curatorFramework.getData().forPath(path);
+            }
+            if (b != null) {
+                state = (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
+            }
+            return state;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    void shutdown () {
+        curatorFramework.close();
+    }
+
+    private String getZkPath (String stream, String shardId) {
+        String path = "";
+        if (!zkInfo.getZkNode().startsWith("/")) {
+            path += "/";
+        }
+        path += zkInfo.getZkNode();
+        if (!zkInfo.getZkNode().endsWith("/")) {
+            path += "/";
+        }
+        path += (stream + "/" + shardId);
+        return path;
+    }
+}