You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/08/03 04:58:21 UTC
[4/6] storm git commit: STORM-1839: Move zk and kinesis interaction
in to its own class.
STORM-1839: Move zk and kinesis interaction in to its own class.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b59c75c0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b59c75c0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b59c75c0
Branch: refs/heads/master
Commit: b59c75c052ee73203775d9acc2cecb7c691de940
Parents: 49c4747
Author: Priyank <ps...@hortonworks.com>
Authored: Mon Aug 1 12:19:23 2016 -0700
Committer: Priyank <ps...@hortonworks.com>
Committed: Mon Aug 1 12:19:23 2016 -0700
----------------------------------------------------------------------
.../storm/kinesis/spout/KinesisConnection.java | 108 +++++++++++
.../kinesis/spout/KinesisRecordsManager.java | 187 ++++---------------
.../storm/kinesis/spout/ZKConnection.java | 95 ++++++++++
3 files changed, 238 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b59c75c0/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
new file mode 100644
index 0000000..dfd9049
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+class KinesisConnection {
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
+ private final KinesisConnectionInfo kinesisConnectionInfo;
+ private AmazonKinesisClient kinesisClient;
+
+ KinesisConnection (KinesisConnectionInfo kinesisConnectionInfo) {
+ this.kinesisConnectionInfo = kinesisConnectionInfo;
+ }
+
+ void initialize () {
+ kinesisClient = new AmazonKinesisClient(kinesisConnectionInfo.getCredentialsProvider(), kinesisConnectionInfo.getClientConfiguration());
+ kinesisClient.setRegion(Region.getRegion(kinesisConnectionInfo.getRegion()));
+ }
+
+ List<Shard> getShardsForStream (String stream) {
+ DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
+ describeStreamRequest.setStreamName(stream);
+ List<Shard> shards = new ArrayList<>();
+ String exclusiveStartShardId = null;
+ do {
+ describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
+ DescribeStreamResult describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
+ shards.addAll(describeStreamResult.getStreamDescription().getShards());
+ if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
+ exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
+ } else {
+ exclusiveStartShardId = null;
+ }
+ } while ( exclusiveStartShardId != null );
+ LOG.info("Number of shards for stream " + stream + " are " + shards.size());
+ return shards;
+ }
+
+ String getShardIterator (String stream, String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Date timestamp) {
+ String shardIterator = "";
+ try {
+ GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
+ getShardIteratorRequest.setStreamName(stream);
+ getShardIteratorRequest.setShardId(shardId);
+ getShardIteratorRequest.setShardIteratorType(shardIteratorType);
+ if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
+ getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
+ } else if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) {
+ getShardIteratorRequest.setTimestamp(timestamp);
+ }
+ GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
+ if (getShardIteratorResult != null) {
+ shardIterator = getShardIteratorResult.getShardIterator();
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " +
+ sequenceNumber + " timestamp " + timestamp, e);
+ }
+ LOG.warn("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " +
+ sequenceNumber + " timestamp" + timestamp);
+ return shardIterator;
+ }
+
+ GetRecordsResult fetchRecords (String shardIterator) {
+ GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
+ getRecordsRequest.setShardIterator(shardIterator);
+ getRecordsRequest.setLimit(kinesisConnectionInfo.getRecordsLimit());
+ GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
+ return getRecordsResult;
+ }
+
+ void shutdown () {
+ kinesisClient.shutdown();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b59c75c0/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
index bdd054f..7f3f024 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
@@ -18,32 +18,17 @@
package org.apache.storm.kinesis.spout;
-import com.amazonaws.regions.Region;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
-import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
-import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
-import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.zookeeper.CreateMode;
-import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigInteger;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -53,8 +38,10 @@ import java.util.TreeSet;
class KinesisRecordsManager {
private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
- // zk interaction object
- private transient CuratorFramework curatorFramework;
+ // object handling zk interaction
+ private transient ZKConnection zkConnection;
+ // object handling interaction with kinesis
+ private transient KinesisConnection kinesisConnection;
// Kinesis Spout KinesisConfig object
private transient final KinesisConfig kinesisConfig;
// Queue of records per shard fetched from kinesis and are waiting to be emitted
@@ -78,18 +65,19 @@ class KinesisRecordsManager {
private transient long lastCommitTime;
// boolean to track deactivated state
private transient boolean deactivated;
- private transient AmazonKinesisClient kinesisClient;
KinesisRecordsManager (KinesisConfig kinesisConfig) {
this.kinesisConfig = kinesisConfig;
+ this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
+ this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
}
void initialize (int myTaskIndex, int totalTasks) {
deactivated = false;
lastCommitTime = System.currentTimeMillis();
- initializeKinesisClient();
- initializeCurator();
- List<Shard> shards = this.getShards();
+ kinesisConnection.initialize();
+ zkConnection.initialize();
+ List<Shard> shards = kinesisConnection.getShardsForStream(kinesisConfig.getStreamName());
LOG.info("myTaskIndex is " + myTaskIndex);
LOG.info("totalTasks is " + totalTasks);
int i = myTaskIndex;
@@ -149,17 +137,24 @@ class KinesisRecordsManager {
String shardId = kinesisMessageId.getShardId();
BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
LOG.debug("Ack received for shardId: " + shardId + " sequenceNumber: " + sequenceNumber);
+ // if an ack is received for a message then add it to the ackedPerShard TreeSet. TreeSet because while committing we need to figure out what is the
+ // highest sequence number that can be committed for this shard
if (!ackedPerShard.containsKey(shardId)) {
ackedPerShard.put(shardId, new TreeSet<BigInteger>());
}
ackedPerShard.get(shardId).add(sequenceNumber);
+ // if the acked message was in emittedPerShard that means we need to remove it from the emittedPerShard(which keeps track of in flight tuples)
if (emittedPerShard.containsKey(shardId)) {
TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
emitted.remove(sequenceNumber);
}
+ // an acked message should not be in failed since if it fails and gets re-emitted it moves to emittedPerShard from failedPerShard. Defensive coding.
+ // Remove it from failedPerShard anyway
if (failedPerShard.containsKey(shardId)) {
failedPerShard.get(shardId).remove(sequenceNumber);
}
+ // if an ack is for a message that failed once at least and was re-emitted then the record itself will be in failedAndFetchedRecords. We use that to
+ // determine if the FailedMessageRetryHandler needs to be told about it and then remove the record itself to clean up memory
if (failedandFetchedRecords.containsKey(kinesisMessageId)) {
kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId);
failedandFetchedRecords.remove(kinesisMessageId);
@@ -192,8 +187,13 @@ class KinesisRecordsManager {
}
void commit () {
- // Logic for deciding what sequence number to ack is find the highest sequence number from acked called X such that there is no sequence number Y in
- // emitted or failed that satisfies X > Y. For e.g. is acked is 1,3,5. Emitted is 2,4,6 then we can only commit 1 and not 3 because 2 is still pending
+ // We have three mutually disjoint treesets per shard at any given time to keep track of what sequence number can be committed to zookeeper.
+ // emittedPerShard, ackedPerShard and failedPerShard. Any record starts by entering emittedPerShard. On ack it moves from emittedPerShard to
+ // ackedPerShard and on fail if retry service tells us to retry then it moves from emittedPerShard to failedPerShard. The failed records will move from
+ // failedPerShard to emittedPerShard when the failed record is emitted again as a retry.
+ // Logic for deciding what sequence number to commit is find the highest sequence number from ackedPerShard called X such that there is no sequence
+ // number Y in emittedPerShard or failedPerShard that satisfies X > Y. For e.g. if ackedPerShard is 1,4,5, emittedPerShard is 2,6 and
+ // failedPerShard is 3,7 then we can only commit 1 and not 4 because 2 is still pending and 3 has failed
for (String shardId: toEmitPerShard.keySet()) {
if (ackedPerShard.containsKey(shardId)) {
BigInteger commitSequenceNumberBound = null;
@@ -221,8 +221,7 @@ class KinesisRecordsManager {
Map<Object, Object> state = new HashMap<>();
state.put("committedSequenceNumber", ackedSequenceNumberToCommit.toString());
LOG.debug("Committing sequence number " + ackedSequenceNumberToCommit.toString() + " for shardId " + shardId);
- String path = getZkPath(shardId);
- commitState(path, state);
+ zkConnection.commitState(kinesisConfig.getStreamName(), shardId, state);
}
}
}
@@ -232,65 +231,20 @@ class KinesisRecordsManager {
void activate () {
LOG.info("Activate called");
deactivated = false;
- initializeKinesisClient();
+ kinesisConnection.initialize();
}
void deactivate () {
LOG.info("Deactivate called");
deactivated = true;
commit();
- shutdownKinesisClient();
+ kinesisConnection.shutdown();
}
void close () {
commit();
- shutdownKinesisClient();
- shutdownCurator();
- }
-
- private String getZkPath (String shardId) {
- String path = "";
- if (!kinesisConfig.getZkInfo().getZkNode().startsWith("/")) {
- path += "/";
- }
- path += kinesisConfig.getZkInfo().getZkNode();
- if (!kinesisConfig.getZkInfo().getZkNode().endsWith("/")) {
- path += "/";
- }
- path += (kinesisConfig.getStreamName() + "/" + shardId);
- return path;
- }
-
- private void commitState (String path, Map<Object, Object> state) {
- byte[] bytes = JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8"));
- try {
- if (curatorFramework.checkExists().forPath(path) == null) {
- curatorFramework.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path, bytes);
- } else {
- curatorFramework.setData().forPath(path, bytes);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private Map<Object, Object> readState (String path) {
- try {
- Map<Object, Object> state = null;
- byte[] b = null;
- if (curatorFramework.checkExists().forPath(path) != null) {
- b = curatorFramework.getData().forPath(path);
- }
- if (b != null) {
- state = (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
- }
- return state;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ kinesisConnection.shutdown();
+ zkConnection.shutdown();
}
// fetch records from kinesis starting at sequence number for message passed as argument. Any other messages fetched and are in the failed queue will also
@@ -304,7 +258,7 @@ class KinesisRecordsManager {
LOG.debug("Fetching failed records for shard id :" + kinesisMessageId.getShardId() + " at sequence number " + kinesisMessageId.getSequenceNumber() +
" using shardIterator " + shardIterator);
try {
- GetRecordsResult getRecordsResult = fetchRecords(shardIterator);
+ GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator);
if (getRecordsResult != null) {
List<Record> records = getRecordsResult.getRecords();
LOG.debug("Records size from fetchFailedRecords is " + records.size());
@@ -346,7 +300,7 @@ class KinesisRecordsManager {
String shardIterator = shardIteratorPerShard.get(shardId);
LOG.debug("Fetching new records for shard id :" + shardId + " using shardIterator " + shardIterator + " after sequence number " +
fetchedSequenceNumberPerShard.get(shardId));
- GetRecordsResult getRecordsResult = fetchRecords(shardIterator);
+ GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator);
if (getRecordsResult != null) {
List<Record> records = getRecordsResult.getRecords();
LOG.debug("Records size from fetchNewRecords is " + records.size());
@@ -376,34 +330,6 @@ class KinesisRecordsManager {
}
}
- private GetRecordsResult fetchRecords (String shardIterator) {
- List<Record> records = new ArrayList<>();
- GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
- getRecordsRequest.setShardIterator(shardIterator);
- getRecordsRequest.setLimit(kinesisConfig.getKinesisConnectionInfo().getRecordsLimit());
- GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
- return getRecordsResult;
- }
-
- private List<Shard> getShards () {
- DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
- describeStreamRequest.setStreamName(kinesisConfig.getStreamName());
- List<Shard> shards = new ArrayList<>();
- String exclusiveStartShardId = null;
- do {
- describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
- DescribeStreamResult describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
- shards.addAll(describeStreamResult.getStreamDescription().getShards());
- if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
- exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
- } else {
- exclusiveStartShardId = null;
- }
- } while ( exclusiveStartShardId != null );
- LOG.info("Number of shards for stream " + kinesisConfig.getStreamName() + " are " + shards.size());
- return shards;
- }
-
private void emitNewRecord (SpoutOutputCollector collector) {
for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
String shardId = entry.getKey();
@@ -450,7 +376,7 @@ class KinesisRecordsManager {
private void initializeFetchedSequenceNumbers () {
for (String shardId : toEmitPerShard.keySet()) {
- Map<Object, Object> state = readState(getZkPath(shardId));
+ Map<Object, Object> state = zkConnection.readState(kinesisConfig.getStreamName(), shardId);
// if state found for this shard in zk, then set the sequence number in fetchedSequenceNumber
if (state != null) {
Object committedSequenceNumber = state.get("committedSequenceNumber");
@@ -474,7 +400,8 @@ class KinesisRecordsManager {
ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null ? kinesisConfig.getShardIteratorType() : ShardIteratorType
.AFTER_SEQUENCE_NUMBER);
// Set the shard iterator for last fetched sequence number to start from correct position in shard
- shardIterator = this.getShardIterator(shardId, shardIteratorType, lastFetchedSequenceNumber, kinesisConfig.getTimestamp());
+ shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), shardId, shardIteratorType, lastFetchedSequenceNumber, kinesisConfig
+ .getTimestamp());
if (shardIterator != null && !shardIterator.isEmpty()) {
LOG.warn("Refreshing shard iterator for new records for shardId " + shardId + " with shardIterator " + shardIterator);
shardIteratorPerShard.put(shardId, shardIterator);
@@ -484,38 +411,14 @@ class KinesisRecordsManager {
private void refreshShardIteratorForFailedRecord (KinesisMessageId kinesisMessageId) {
String shardIterator = null;
// Set the shard iterator for last fetched sequence number to start from correct position in shard
- shardIterator = this.getShardIterator(kinesisMessageId.getShardId(), ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
+ shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), kinesisMessageId.getShardId(), ShardIteratorType
+ .AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
if (shardIterator != null && !shardIterator.isEmpty()) {
LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId + " with shardIterator " + shardIterator);
shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator);
}
}
- private String getShardIterator (String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Date timestamp) {
- String shardIterator = "";
- try {
- GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
- getShardIteratorRequest.setStreamName(kinesisConfig.getStreamName());
- getShardIteratorRequest.setShardId(shardId);
- getShardIteratorRequest.setShardIteratorType(shardIteratorType);
- if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
- getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
- } else if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) {
- getShardIteratorRequest.setTimestamp(timestamp);
- }
- GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
- if (getShardIteratorResult != null) {
- shardIterator = getShardIteratorResult.getShardIterator();
- }
- } catch (Exception e) {
- LOG.warn("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " +
- sequenceNumber + " timestamp " + timestamp, e);
- }
- LOG.warn("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " +
- sequenceNumber + " timestamp" + timestamp);
- return shardIterator;
- }
-
private Long getUncommittedRecordsCount () {
Long result = 0L;
for (Map.Entry<String, TreeSet<BigInteger>> emitted: emittedPerShard.entrySet()) {
@@ -543,24 +446,4 @@ class KinesisRecordsManager {
return fetchRecords;
}
- private void initializeCurator () {
- ZkInfo zkInfo = kinesisConfig.getZkInfo();
- curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), zkInfo.getSessionTimeoutMs(), zkInfo.getConnectionTimeoutMs(), new
- RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
- curatorFramework.start();
- }
-
- private void initializeKinesisClient () {
- kinesisClient = new AmazonKinesisClient(kinesisConfig.getKinesisConnectionInfo().getCredentialsProvider(), kinesisConfig.getKinesisConnectionInfo().getClientConfiguration());
- kinesisClient.setRegion(Region.getRegion(kinesisConfig.getKinesisConnectionInfo().getRegion()));
- }
-
- private void shutdownCurator () {
- curatorFramework.close();
- }
-
- private void shutdownKinesisClient () {
- kinesisClient.shutdown();
- }
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b59c75c0/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
new file mode 100644
index 0000000..41151d1
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.zookeeper.CreateMode;
+import org.json.simple.JSONValue;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+class ZKConnection {
+
+ private final ZkInfo zkInfo;
+ private CuratorFramework curatorFramework;
+
+ ZKConnection (ZkInfo zkInfo) {
+ this.zkInfo = zkInfo;
+ }
+
+ void initialize () {
+ curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), zkInfo.getSessionTimeoutMs(), zkInfo.getConnectionTimeoutMs(), new
+ RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
+ curatorFramework.start();
+ }
+
+ void commitState (String stream, String shardId, Map<Object, Object> state) {
+ byte[] bytes = JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8"));
+ try {
+ String path = getZkPath(stream, shardId);
+ if (curatorFramework.checkExists().forPath(path) == null) {
+ curatorFramework.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path, bytes);
+ } else {
+ curatorFramework.setData().forPath(path, bytes);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ Map<Object, Object> readState (String stream, String shardId) {
+ try {
+ String path = getZkPath(stream, shardId);
+ Map<Object, Object> state = null;
+ byte[] b = null;
+ if (curatorFramework.checkExists().forPath(path) != null) {
+ b = curatorFramework.getData().forPath(path);
+ }
+ if (b != null) {
+ state = (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
+ }
+ return state;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ void shutdown () {
+ curatorFramework.close();
+ }
+
+ private String getZkPath (String stream, String shardId) {
+ String path = "";
+ if (!zkInfo.getZkNode().startsWith("/")) {
+ path += "/";
+ }
+ path += zkInfo.getZkNode();
+ if (!zkInfo.getZkNode().endsWith("/")) {
+ path += "/";
+ }
+ path += (stream + "/" + shardId);
+ return path;
+ }
+}