You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/08/03 05:53:58 UTC
[1/3] storm git commit: Merge branch 'STORM-1839' of
https://github.com/priyank5485/storm into STORM-1839
Repository: storm
Updated Branches:
refs/heads/1.x-branch 14924e3ea -> f4480ade9
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
new file mode 100644
index 0000000..1894934
--- /dev/null
+++ b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisBoltTest.java
@@ -0,0 +1,31 @@
+package org.apache.storm.kinesis.spout.test;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class KinesisBoltTest extends BaseRichBolt {
+ protected static final Logger LOG = LoggerFactory.getLogger(KinesisBoltTest.class);
+ private transient OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ LOG.info("input = [" + input + "]");
+ collector.ack(input);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
new file mode 100644
index 0000000..2a39463
--- /dev/null
+++ b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/KinesisSpoutTopology.java
@@ -0,0 +1,40 @@
+package org.apache.storm.kinesis.spout.test;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.AlreadyAliveException;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.kinesis.spout.CredentialsProviderChain;
+import org.apache.storm.kinesis.spout.ExponentialBackoffRetrier;
+import org.apache.storm.kinesis.spout.KinesisConfig;
+import org.apache.storm.kinesis.spout.KinesisConnectionInfo;
+import org.apache.storm.kinesis.spout.KinesisSpout;
+import org.apache.storm.kinesis.spout.RecordToTupleMapper;
+import org.apache.storm.kinesis.spout.ZkInfo;
+import org.apache.storm.topology.TopologyBuilder;
+
+import java.util.Date;
+
+public class KinesisSpoutTopology {
+ public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
+ String topologyName = args[0];
+ RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
+ KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2,
+ 1000);
+ ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000);
+ KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON,
+ recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L);
+ KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig);
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ topologyBuilder.setSpout("spout", kinesisSpout, 3);
+ topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");
+ Config topologyConfig = new Config();
+ topologyConfig.setDebug(true);
+ topologyConfig.setNumWorkers(3);
+ StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
new file mode 100644
index 0000000..03e024a
--- /dev/null
+++ b/external/storm-kinesis/src/test/java/org/apache/storm/kinesis/spout/test/TestRecordToTupleMapper.java
@@ -0,0 +1,41 @@
+package org.apache.storm.kinesis.spout.test;
+
+import com.amazonaws.services.kinesis.model.Record;
+import org.apache.storm.kinesis.spout.RecordToTupleMapper;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestRecordToTupleMapper implements RecordToTupleMapper, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(TestRecordToTupleMapper.class);
+ @Override
+ public Fields getOutputFields() {
+ return new Fields("partitionKey", "sequenceNumber", "data");
+ }
+
+ @Override
+ public List<Object> getTuple(Record record) {
+ CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
+ List<Object> tuple = new ArrayList<>();
+ tuple.add(record.getPartitionKey());
+ tuple.add(record.getSequenceNumber());
+ try {
+ String data = decoder.decode(record.getData()).toString();
+ LOG.info("data is " + data);
+ tuple.add(data);
+ } catch (CharacterCodingException e) {
+ e.printStackTrace();
+ LOG.warn("Exception occured. Emitting tuple with empty string data", e);
+ tuple.add("");
+ }
+ LOG.info("Tuple from record is " + tuple);
+ return tuple;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 01105c1..da006a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -291,6 +291,7 @@
<module>external/storm-kafka-client</module>
<module>external/storm-opentsdb</module>
<module>external/storm-kafka-monitor</module>
+ <module>external/storm-kinesis</module>
</modules>
<dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 646bef0..0471501 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -107,6 +107,20 @@
</includes>
</fileSet>
<fileSet>
+ <directory>${project.basedir}/../../external/storm-kinesis/target</directory>
+ <outputDirectory>external/storm-kinesis</outputDirectory>
+ <includes>
+ <include>storm*jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../../external/storm-kinesis</directory>
+ <outputDirectory>external/storm-kinesis</outputDirectory>
+ <includes>
+ <include>README.*</include>
+ </includes>
+ </fileSet>
+ <fileSet>
<directory>${project.basedir}/../../external/storm-hdfs/target</directory>
<outputDirectory>external/storm-hdfs</outputDirectory>
<includes>
[3/3] storm git commit: Added STORM-1839 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-1839 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f4480ade
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f4480ade
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f4480ade
Branch: refs/heads/1.x-branch
Commit: f4480ade9aa124152b4946b9c6b9d90103128300
Parents: eed4b42
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Aug 2 21:59:42 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Aug 2 21:59:42 2016 -0700
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f4480ade/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0e181d8..ada24e2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-1839: Storm spout implementation for Amazon Kinesis Streams.
* STORM-1876: Option to build storm-kafka and storm-kafka-client with different kafka client version
* STORM-2000: Package storm-opentsdb as part of external dir in installation
* STORM-1989: X-Frame-Options support for Storm UI
[2/3] storm git commit: Merge branch 'STORM-1839' of
https://github.com/priyank5485/storm into STORM-1839
Posted by sr...@apache.org.
Merge branch 'STORM-1839' of https://github.com/priyank5485/storm into STORM-1839
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eed4b423
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eed4b423
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eed4b423
Branch: refs/heads/1.x-branch
Commit: eed4b42358faf0eef2585a402fbe342b10a2edc3
Parents: 14924e3
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Aug 2 21:35:10 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Aug 2 21:59:12 2016 -0700
----------------------------------------------------------------------
external/storm-kinesis/README.md | 140 ++++++
external/storm-kinesis/pom.xml | 69 +++
.../kinesis/spout/CredentialsProviderChain.java | 39 ++
.../spout/ExponentialBackoffRetrier.java | 164 +++++++
.../spout/FailedMessageRetryHandler.java | 48 ++
.../storm/kinesis/spout/KinesisConfig.java | 133 ++++++
.../storm/kinesis/spout/KinesisConnection.java | 108 +++++
.../kinesis/spout/KinesisConnectionInfo.java | 111 +++++
.../storm/kinesis/spout/KinesisMessageId.java | 73 +++
.../kinesis/spout/KinesisRecordsManager.java | 449 +++++++++++++++++++
.../storm/kinesis/spout/KinesisSpout.java | 86 ++++
.../kinesis/spout/RecordToTupleMapper.java | 38 ++
.../storm/kinesis/spout/ZKConnection.java | 95 ++++
.../org/apache/storm/kinesis/spout/ZkInfo.java | 118 +++++
.../kinesis/spout/test/KinesisBoltTest.java | 31 ++
.../spout/test/KinesisSpoutTopology.java | 40 ++
.../spout/test/TestRecordToTupleMapper.java | 41 ++
pom.xml | 1 +
storm-dist/binary/src/main/assembly/binary.xml | 14 +
19 files changed, 1798 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/README.md b/external/storm-kinesis/README.md
new file mode 100644
index 0000000..f163a54
--- /dev/null
+++ b/external/storm-kinesis/README.md
@@ -0,0 +1,140 @@
+#Storm Kinesis Spout
+Provides core storm spout for consuming data from a stream in Amazon Kinesis Streams. It stores the sequence numbers that can be committed in zookeeper and
+starts consuming records after that sequence number on restart by default. Below is the code sample to create a sample topology that uses the spout. Each
+object used in configuring the spout is explained below. Ideally, the number of spout tasks should be equal to number of shards in kinesis. However each task
+can read from more than one shard.
+
+```java
+public class KinesisSpoutTopology {
+ public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
+ String topologyName = args[0];
+ RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
+ KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2,
+ 1000);
+ ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000);
+ KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON,
+ recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L);
+ KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig);
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ topologyBuilder.setSpout("spout", kinesisSpout, 3);
+ topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");
+ Config topologyConfig = new Config();
+ topologyConfig.setDebug(true);
+ topologyConfig.setNumWorkers(3);
+ StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology());
+ }
+}
+```
+As you can see above the spout takes an object of KinesisConfig in its constructor. The constructor of KinesisConfig takes 8 objects as explained below.
+
+#### `String` streamName
+name of kinesis stream to consume data from
+
+#### `ShardIteratorType` shardIteratorType
+3 types are supported - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. By default this argument is ignored if state for shards
+is found in zookeeper. Hence they will apply the first time a topology is started. If you want to use any of these in subsequent runs of the topology, you
+will need to clear the state of zookeeper node used for storing sequence numbers
+
+#### `RecordToTupleMapper` recordToTupleMapper
+an implementation of `RecordToTupleMapper` interface that spout will call to convert a kinesis record to a storm tuple. It has two methods. getOutputFields
+tells the spout the fields that will be present in the tuple emitted from the getTuple method. If getTuple returns null, the record will be acked
+```java
+ Fields getOutputFields ();
+ List<Object> getTuple (Record record);
+```
+
+#### `Date` timestamp
+used in conjunction with the AT_TIMESTAMP shardIteratorType argument. This will make the spout fetch records from kinesis starting at that time or later. The
+time used by kinesis is the server side time associated to the record by kinesis
+
+#### `FailedMessageRetryHadnler` failedMessageRetryHandler
+an implementation of the `FailedMessageRetryHandler` interface. By default this module provides an implementation that supports a exponential backoff retry
+mechanism for failed messages. That implementation has two constructors. Default no args constructor will configure first retry at 100 milliseconds and
+subsequent retires at Math.pow(2, i-1) where i is the retry number in the range 2 to LONG.MAX_LONG. 2 represents the base for exponential function in seconds.
+Other constructor takes retry interval in millis for first retry as first argument, base for exponential function in seconds as second argument and number of
+retries as third argument. The methods of this interface and its working in accord with the spout is explained below
+```java
+ boolean failed (KinesisMessageId messageId);
+ KinesisMessageId getNextFailedMessageToRetry ();
+ void failedMessageEmitted (KinesisMessageId messageId);
+ void acked (KinesisMessageId messageId);
+```
+failed method will be called on every tuple that failed in the spout. It should return true if that failed message is scheduled to be retried, false otherwise.
+
+getNextFailedMessageToRetry method will be called the first thing every time a spout wants to emit a tuple. It should return a message that should be retried
+if any or null otherwise. Note that it can return null in the case it does not have any message to retry as of that moment. However, it should eventually
+return every message for which it returned true when failed method was called for that message
+
+failedMessageEmitted will be called if spout successfully manages to get the record from kinesis and emit it. If not, the implementation should return the same
+message when getNextFailedMessageToRetry is called again
+
+acked will be called once the failed message was re-emitted and successfully acked by the spout. If it was failed by the spout failed will be called again
+
+#### `ZkInfo` zkInfo
+an object encapsulating information for zookeeper interaction. The constructor takes zkUrl as first argument which is a comma separated string of zk host and
+port, zkNode as second that will be used as the root node for storing committed sequence numbers, session timeout as third in milliseconds, connection timeout
+as fourth in milliseconds, commit interval as fifth in milliseconds for committing sequence numbers to zookeeper, retry attempts as sixth for zk client
+connection retry attempts, retry interval as seventh in milliseconds for time to wait before retrying to connect.
+
+#### `KinesisConnectionInfo` kinesisConnectionInfo
+an object that captures arguments for connecting to kinesis using kinesis client. It has a constructor that takes an implementation of `AWSCredentialsProvider`
+as first argument. This module provides an implementation called `CredentialsProviderChain` that allows the spout to authenticate with kinesis using one of
+the 5 mechanisms in this order - `EnvironmentVariableCredentialsProvider`, `SystemPropertiesCredentialsProvider`, `ClasspathPropertiesFileCredentialsProvider`,
+`InstanceProfileCredentialsProvider`, `ProfileCredentialsProvider`. It takes an object of `ClientConfiguration` as second argument for configuring the kinesis
+client, `Regions` as third argument that sets the region to connect to on the client and recordsLimit as the fourth argument which represents the maximum number
+of records kinesis client will retrieve for every GetRecords request. This limit should be carefully chosen based on the size of the record, kinesis
+throughput rate limits and per tuple latency in storm for the topology. Also if one task will be reading from more than one shards then that will also affect
+the choice of limit argument
+
+#### `Long` maxUncommittedRecords
+this represents the maximum number of uncommitted sequence numbers allowed per task. Once this number is reached spout will not fetch any new records from
+kinesis. Uncommited sequence numbers are defined as the sum of all the messages for a task that have not been committed to zookeeper. This is different from
+topology level max pending messages. For example if this value is set to 10, and the spout emitted sequence numbers from 1 to 10. Sequence number 1 is pending
+and 2 to 10 acked. In that case the number of uncommitted sequence numbers is 10 since no sequence number in the range 1 to 10 can be committed to zk.
+However, storm can still call next tuple on the spout because there is only 1 pending message
+
+### Maven dependencies
+Aws sdk version that this was tested with is 1.10.77
+
+```xml
+ <dependencies>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk</artifactId>
+ <version>${aws-java-sdk.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ </dependency>
+ </dependencies>
+```
+
+#Future Work
+Handle merging or splitting of shards in kinesis, Trident spout implementation and metrics
+
+## Committer Sponsors
+
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/pom.xml b/external/storm-kinesis/pom.xml
new file mode 100644
index 0000000..f1872dc
--- /dev/null
+++ b/external/storm-kinesis/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <artifactId>storm-kinesis</artifactId>
+ <name>storm-kinesis</name>
+
+ <properties>
+ <aws-java-sdk.version>1.10.77</aws-java-sdk.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk</artifactId>
+ <version>${aws-java-sdk.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
+
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
new file mode 100644
index 0000000..4287ae0
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.auth.AWSCredentialsProviderChain;
+import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.InstanceProfileCredentialsProvider;
+import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+
+/**
+ * Class representing chain of mechanisms that will be used in order to connect to kinesis
+ */
+public class CredentialsProviderChain extends AWSCredentialsProviderChain {
+ public CredentialsProviderChain () {
+ super(new EnvironmentVariableCredentialsProvider(),
+ new SystemPropertiesCredentialsProvider(),
+ new ClasspathPropertiesFileCredentialsProvider(),
+ new InstanceProfileCredentialsProvider(),
+ new ProfileCredentialsProvider());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
new file mode 100644
index 0000000..2a702f8
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
+ // Wait interfal for retrying after first failure
+ private final Long initialDelayMillis;
+ // Base for exponential function in seconds for retrying for second, third and so on failures
+ private final Long baseSeconds;
+ // Maximum number of retries
+ private final Long maxRetries;
+ // map to track number of failures for each kinesis message that failed
+ private Map<KinesisMessageId, Long> failCounts = new HashMap<>();
+ // map to track next retry time for each kinesis message that failed
+ private Map<KinesisMessageId, Long> retryTimes = new HashMap<>();
+ // sorted set of records to be retrued based on retry time. earliest retryTime record comes first
+ private SortedSet<KinesisMessageId> retryMessageSet = new TreeSet<>(new RetryTimeComparator());
+
+ /**
+ * no args constructor that uses defaults of 100 ms for first retry, max retries of Long.MAX_VALUE and an exponential backoff of Math.pow(2,i-1) secs for
+ * retry i where i = 2,3,
+ */
+ public ExponentialBackoffRetrier () {
+ this(100L, 2L, Long.MAX_VALUE);
+ }
+
+ /**
+ *
+ * @param initialDelayMillis delay in milliseconds for first retry
+ * @param baseSeconds base for exponent function in seconds
+ * @param maxRetries maximum number of retries before the record is discarded/acked
+ */
+ public ExponentialBackoffRetrier (Long initialDelayMillis, Long baseSeconds, Long maxRetries) {
+ this.initialDelayMillis = initialDelayMillis;
+ this.baseSeconds = baseSeconds;
+ this.maxRetries = maxRetries;
+ validate();
+ }
+
+ private void validate () {
+ if (initialDelayMillis < 0) {
+ throw new IllegalArgumentException("initialDelayMillis cannot be negative." );
+ }
+ if (baseSeconds < 0) {
+ throw new IllegalArgumentException("baseSeconds cannot be negative.");
+ }
+ if (maxRetries < 0) {
+ throw new IllegalArgumentException("maxRetries cannot be negative.");
+ }
+ }
+ @Override
+ public boolean failed(KinesisMessageId messageId) {
+ LOG.debug("Handling failed message " + messageId);
+ // if maxRetries is 0, dont retry and return false as per interface contract
+ if (maxRetries == 0) {
+ LOG.warn("maxRetries set to 0. Hence not queueing " + messageId);
+ return false;
+ }
+ // if first failure add it to the count map
+ if (!failCounts.containsKey(messageId)) {
+ failCounts.put(messageId, 0L);
+ }
+ // increment the fail count as we started with 0
+ Long failCount = failCounts.get(messageId);
+ failCounts.put(messageId, ++failCount);
+ // if fail count is greater than maxRetries, discard or ack. for e.g. for maxRetries 3, 4 failures are allowed at maximum
+ if (failCount > maxRetries) {
+ LOG.warn("maxRetries reached so dropping " + messageId);
+ failCounts.remove(messageId);
+ return false;
+ }
+ // if reached so far, add it to the set of messages waiting to be retried with next retry time based on how many times it failed
+ retryTimes.put(messageId, getRetryTime(failCount));
+ retryMessageSet.add(messageId);
+ LOG.debug("Scheduled " + messageId + " for retry at " + retryTimes.get(messageId) + " and retry attempt " + failCount);
+ return true;
+ }
+
+ @Override
+ public void acked(KinesisMessageId messageId) {
+ // message was acked after being retried. so clear the state for that message
+ LOG.debug("Ack received for " + messageId + ". Hence cleaning state.");
+ failCounts.remove(messageId);
+ }
+
+ @Override
+ public KinesisMessageId getNextFailedMessageToRetry() {
+ KinesisMessageId result = null;
+ // return the first message to be retried from the set. It will return the message with the earliest retry time <= current time
+ if (!retryMessageSet.isEmpty() ) {
+ result = retryMessageSet.first();
+ if (!(retryTimes.get(result) <= System.nanoTime())) {
+ result = null;
+ }
+ }
+ LOG.debug("Returning " + result + " to spout for retrying.");
+ return result;
+ }
+
+ @Override
+ public void failedMessageEmitted(KinesisMessageId messageId) {
+ // spout notified that message returned by us for retrying was actually emitted. hence remove it from set and wait for its ack or fail
+ // but still keep it in counts map to retry again on failure or remove on ack
+ LOG.debug("Spout says " + messageId + " emitted. Hence removing it from queue and wait for its ack or fail");
+ retryMessageSet.remove(messageId);
+ retryTimes.remove(messageId);
+ }
+
+ // private helper method to get next retry time for retry attempt i (handles long overflow as well by capping it to Long.MAX_VALUE)
+ private Long getRetryTime (Long retryNum) {
+ Long retryTime = System.nanoTime();
+ Long nanoMultiplierForMillis = 1000000L;
+ // if first retry then retry time = current time + initial delay
+ if (retryNum == 1) {
+ retryTime += initialDelayMillis * nanoMultiplierForMillis;
+ } else {
+ // else use the exponential backoff logic and handle long overflow
+ Long maxValue = Long.MAX_VALUE;
+ double time = Math.pow(baseSeconds, retryNum - 1) * 1000 * nanoMultiplierForMillis;
+ // if delay or delay + current time are bigger than long max value
+ // second predicate for or condition uses the fact that long addition over the limit circles back
+ if ((time >= maxValue.doubleValue()) || ((retryTime + (long) time) < retryTime)) {
+ retryTime = maxValue;
+ } else {
+ retryTime += (long) time;
+ }
+ }
+ return retryTime;
+ }
+
+ private class RetryTimeComparator implements Serializable, Comparator<KinesisMessageId> {
+ @Override
+ public int compare(KinesisMessageId o1, KinesisMessageId o2) {
+ return retryTimes.get(o1).compareTo(retryTimes.get(o2));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
new file mode 100644
index 0000000..bb0e450
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import java.io.Serializable;
+
+public interface FailedMessageRetryHandler extends Serializable {
+ /**
+ * message with messageId failed in the spout
+ * @param messageId
+ * @return true if this failed message was scheduled to be retried, false otherwise
+ */
+ boolean failed (KinesisMessageId messageId);
+
+ /**
+ * message with messageId succeeded/acked in the spout
+ * @param messageId
+ */
+ void acked (KinesisMessageId messageId);
+
+ /**
+ * Get the next failed message's id to retry if any, null otherwise
+ * @return messageId
+ */
+ KinesisMessageId getNextFailedMessageToRetry ();
+
+ /**
+ * message with messageId returned by last call to getNextFailedMessageToRetry was emitted/retried by the spout
+ * @param messageId
+ */
+ void failedMessageEmitted (KinesisMessageId messageId);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
new file mode 100644
index 0000000..744aef5
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
+import java.io.Serializable;
+import java.util.Date;
+
+public class KinesisConfig implements Serializable {
+ // kinesis stream name to read from
+ private final String streamName;
+ // shard iterator type based on kinesis api - beginning of time, latest, at timestamp are only supported
+ private final ShardIteratorType shardIteratorType;
+ // implementation for converting a Kinesis record to a storm tuple
+ private final RecordToTupleMapper recordToTupleMapper;
+ // timestamp to be used for shardIteratorType AT_TIMESTAMP - can be null
+ private final Date timestamp;
+ // implementation for handling the failed messages retry logic
+ private final FailedMessageRetryHandler failedMessageRetryHandler;
+ // object capturing all zk related information for storing committed sequence numbers
+ private final ZkInfo zkInfo;
+ // object representing information on paramaters to use while connecting to kinesis using kinesis client
+ private final KinesisConnectionInfo kinesisConnectionInfo;
+ // this number represents the number of messages that are still not committed to zk. it will prevent the spout from emitting further.
+ // for e.g. if 1 failed and 2,3,4,5..... all have been acked by storm, they still cant be committed to zk because 1 is still in failed set. As a result
+ // the acked queue can infinitely grow without any of them being committed to zk. topology max pending does not help since from storm's view they are acked
+ private final Long maxUncommittedRecords;
+
+ public KinesisConfig(String streamName, ShardIteratorType shardIteratorType, RecordToTupleMapper recordToTupleMapper, Date timestamp, FailedMessageRetryHandler
+ failedMessageRetryHandler, ZkInfo zkInfo, KinesisConnectionInfo kinesisConnectionInfo, Long maxUncommittedRecords) {
+ this.streamName = streamName;
+ this.shardIteratorType = shardIteratorType;
+ this.recordToTupleMapper = recordToTupleMapper;
+ this.timestamp = timestamp;
+ this.failedMessageRetryHandler = failedMessageRetryHandler;
+ this.zkInfo = zkInfo;
+ this.kinesisConnectionInfo = kinesisConnectionInfo;
+ this.maxUncommittedRecords = maxUncommittedRecords;
+ validate();
+ }
+
+ private void validate () {
+ if (streamName == null || streamName.length() < 1) {
+ throw new IllegalArgumentException("streamName is required and cannot be of length 0.");
+ }
+ if (shardIteratorType == null || shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType
+ .AT_SEQUENCE_NUMBER)) {
+ throw new IllegalArgumentException("shardIteratorType has to be one of the " + ShardIteratorType.AT_TIMESTAMP + "," + ShardIteratorType.LATEST +
+ "," + ShardIteratorType.TRIM_HORIZON);
+ }
+ if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP) && timestamp == null) {
+ throw new IllegalArgumentException("timestamp must be provided if shardIteratorType is " + ShardIteratorType.AT_TIMESTAMP);
+ }
+ if (recordToTupleMapper == null) {
+ throw new IllegalArgumentException("recordToTupleMapper cannot be null");
+ }
+ if (failedMessageRetryHandler == null) {
+ throw new IllegalArgumentException("failedMessageRetryHandler cannot be null");
+ }
+ if (zkInfo == null) {
+ throw new IllegalArgumentException("zkInfo cannot be null");
+ }
+ if (kinesisConnectionInfo == null) {
+ throw new IllegalArgumentException("kinesisConnectionInfo cannot be null");
+ }
+ if (maxUncommittedRecords == null || maxUncommittedRecords < 1) {
+ throw new IllegalArgumentException("maxUncommittedRecords has to be a positive integer");
+ }
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public ShardIteratorType getShardIteratorType() {
+ return shardIteratorType;
+ }
+
+ public RecordToTupleMapper getRecordToTupleMapper() {
+ return recordToTupleMapper;
+ }
+
+ public Date getTimestamp() {
+ return timestamp;
+ }
+
+ public FailedMessageRetryHandler getFailedMessageRetryHandler () {
+ return failedMessageRetryHandler;
+ }
+
+ public ZkInfo getZkInfo () {
+ return zkInfo;
+ }
+
+ public KinesisConnectionInfo getKinesisConnectionInfo () {
+ return kinesisConnectionInfo;
+ }
+
+ public Long getMaxUncommittedRecords () {
+ return maxUncommittedRecords;
+ }
+
+ @Override
+ public String toString() {
+ return "KinesisConfig{" +
+ "streamName='" + streamName + '\'' +
+ ", shardIteratorType=" + shardIteratorType +
+ ", recordToTupleMapper=" + recordToTupleMapper +
+ ", timestamp=" + timestamp +
+ ", zkInfo=" + zkInfo +
+ ", kinesisConnectionInfo=" + kinesisConnectionInfo +
+ ", failedMessageRetryHandler =" + failedMessageRetryHandler +
+ ", maxUncommittedRecords=" + maxUncommittedRecords +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
new file mode 100644
index 0000000..dfd9049
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+class KinesisConnection {
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
+ private final KinesisConnectionInfo kinesisConnectionInfo;
+ private AmazonKinesisClient kinesisClient;
+
+ KinesisConnection (KinesisConnectionInfo kinesisConnectionInfo) {
+ this.kinesisConnectionInfo = kinesisConnectionInfo;
+ }
+
+ void initialize () {
+ kinesisClient = new AmazonKinesisClient(kinesisConnectionInfo.getCredentialsProvider(), kinesisConnectionInfo.getClientConfiguration());
+ kinesisClient.setRegion(Region.getRegion(kinesisConnectionInfo.getRegion()));
+ }
+
+ List<Shard> getShardsForStream (String stream) {
+ DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
+ describeStreamRequest.setStreamName(stream);
+ List<Shard> shards = new ArrayList<>();
+ String exclusiveStartShardId = null;
+ do {
+ describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
+ DescribeStreamResult describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
+ shards.addAll(describeStreamResult.getStreamDescription().getShards());
+ if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
+ exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
+ } else {
+ exclusiveStartShardId = null;
+ }
+ } while ( exclusiveStartShardId != null );
+ LOG.info("Number of shards for stream " + stream + " are " + shards.size());
+ return shards;
+ }
+
+ String getShardIterator (String stream, String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Date timestamp) {
+ String shardIterator = "";
+ try {
+ GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
+ getShardIteratorRequest.setStreamName(stream);
+ getShardIteratorRequest.setShardId(shardId);
+ getShardIteratorRequest.setShardIteratorType(shardIteratorType);
+ if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
+ getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
+ } else if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) {
+ getShardIteratorRequest.setTimestamp(timestamp);
+ }
+ GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
+ if (getShardIteratorResult != null) {
+ shardIterator = getShardIteratorResult.getShardIterator();
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " +
+ sequenceNumber + " timestamp " + timestamp, e);
+ }
+ LOG.warn("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " +
+ sequenceNumber + " timestamp" + timestamp);
+ return shardIterator;
+ }
+
+ GetRecordsResult fetchRecords (String shardIterator) {
+ GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
+ getRecordsRequest.setShardIterator(shardIterator);
+ getRecordsRequest.setLimit(kinesisConnectionInfo.getRecordsLimit());
+ GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
+ return getRecordsResult;
+ }
+
+ void shutdown () {
+ kinesisClient.shutdown();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
new file mode 100644
index 0000000..67ca29f
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class KinesisConnectionInfo implements Serializable {
+ private final byte[] serializedKinesisCredsProvider;
+ private final byte[] serializedkinesisClientConfig;
+ private final Integer recordsLimit;
+ private final Regions region;
+
+ private transient AWSCredentialsProvider credentialsProvider;
+ private transient ClientConfiguration clientConfiguration;
+
+ /**
+ *
+ * @param credentialsProvider implementation to provide credentials to connect to kinesis
+ * @param clientConfiguration client configuration to pass to kinesis client
+ * @param region region to connect to
+ * @param recordsLimit max records to be fetched in a getRecords request to kinesis
+ */
+ public KinesisConnectionInfo (AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, Regions region, Integer recordsLimit) {
+ if (recordsLimit == null || recordsLimit <= 0) {
+ throw new IllegalArgumentException("recordsLimit has to be a positive integer");
+ }
+ if (region == null) {
+ throw new IllegalArgumentException("region cannot be null");
+ }
+ serializedKinesisCredsProvider = getKryoSerializedBytes(credentialsProvider);
+ serializedkinesisClientConfig = getKryoSerializedBytes(clientConfiguration);
+ this.recordsLimit = recordsLimit;
+ this.region = region;
+ }
+
+ public Integer getRecordsLimit() {
+ return recordsLimit;
+ }
+
+ public AWSCredentialsProvider getCredentialsProvider() {
+ if (credentialsProvider == null) {
+ credentialsProvider = (AWSCredentialsProvider) this.getKryoDeserializedObject(serializedKinesisCredsProvider);
+ }
+ return credentialsProvider;
+ }
+
+ public ClientConfiguration getClientConfiguration() {
+ if (clientConfiguration == null) {
+ clientConfiguration = (ClientConfiguration) this.getKryoDeserializedObject(serializedkinesisClientConfig);
+ }
+ return clientConfiguration;
+ }
+
+ public Regions getRegion() {
+ return region;
+ }
+
+ private byte[] getKryoSerializedBytes (final Object obj) {
+ final Kryo kryo = new Kryo();
+ final ByteArrayOutputStream os = new ByteArrayOutputStream();
+ final Output output = new Output(os);
+ kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+ kryo.writeClassAndObject(output, obj);
+ output.flush();
+ return os.toByteArray();
+ }
+
+ private Object getKryoDeserializedObject (final byte[] ser) {
+ final Kryo kryo = new Kryo();
+ final Input input = new Input(new ByteArrayInputStream(ser));
+ kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+ return kryo.readClassAndObject(input);
+ }
+
+ @Override
+ public String toString() {
+ return "KinesisConnectionInfo{" +
+ "serializedKinesisCredsProvider=" + Arrays.toString(serializedKinesisCredsProvider) +
+ ", serializedkinesisClientConfig=" + Arrays.toString(serializedkinesisClientConfig) +
+ ", region=" + region +
+ ", recordsLimit=" + recordsLimit +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
new file mode 100644
index 0000000..dd239f1
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+public class KinesisMessageId {
+ private final String streamName;
+ private final String shardId;
+ private final String sequenceNumber;
+
+ KinesisMessageId (String streamName, String shardId, String sequenceNumber) {
+ this.streamName = streamName;
+ this.shardId = shardId;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public String getStreamName () {
+ return streamName;
+ }
+
+ public String getShardId () {
+ return shardId;
+ }
+
+ public String getSequenceNumber () {
+ return sequenceNumber;
+ }
+
+ @Override
+ public String toString () {
+ return "KinesisMessageId{" +
+ "streamName='" + streamName + '\'' +
+ ", shardId='" + shardId + '\'' +
+ ", sequenceNumber='" + sequenceNumber + '\'' +
+ '}';
+ }
+
+ @Override
+ public boolean equals (Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ KinesisMessageId that = (KinesisMessageId) o;
+
+ if (streamName != null ? !streamName.equals(that.streamName) : that.streamName != null) return false;
+ if (shardId != null ? !shardId.equals(that.shardId) : that.shardId != null) return false;
+ return !(sequenceNumber != null ? !sequenceNumber.equals(that.sequenceNumber) : that.sequenceNumber != null);
+
+ }
+
+ @Override
+ public int hashCode () {
+ int result = streamName != null ? streamName.hashCode() : 0;
+ result = 31 * result + (shardId != null ? shardId.hashCode() : 0);
+ result = 31 * result + (sequenceNumber != null ? sequenceNumber.hashCode() : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
new file mode 100644
index 0000000..7f3f024
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+class KinesisRecordsManager {
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
+ // object handling zk interaction
+ private transient ZKConnection zkConnection;
+ // object handling interaction with kinesis
+ private transient KinesisConnection kinesisConnection;
+ // Kinesis Spout KinesisConfig object
+ private transient final KinesisConfig kinesisConfig;
+ // Queue of records per shard fetched from kinesis and are waiting to be emitted
+ private transient Map<String, LinkedList<Record>> toEmitPerShard = new HashMap<>();
+ // Map of records that were fetched from kinesis as a result of failure and are waiting to be emitted
+ private transient Map<KinesisMessageId, Record> failedandFetchedRecords = new HashMap<>();
+ // Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the
+ // sequence number to commit. Logic explained in commit
+ private transient Map<String, TreeSet<BigInteger>> emittedPerShard = new HashMap<>();
+ // sorted acked sequence numbers - needed to figure out what sequence number can be committed
+ private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new HashMap<>();
+ // sorted failed sequence numbers - needed to figure out what sequence number can be committed
+ private transient Map<String, TreeSet<BigInteger>> failedPerShard = new HashMap<>();
+ // shard iterator corresponding to position in shard for new messages
+ private transient Map<String, String> shardIteratorPerShard = new HashMap<>();
+ // last fetched sequence number corresponding to position in shard
+ private transient Map<String, String> fetchedSequenceNumberPerShard = new HashMap<>();
+ // shard iterator corresponding to position in shard for failed messages
+ private transient Map<KinesisMessageId, String> shardIteratorPerFailedMessage = new HashMap<>();
+ // timestamp to decide when to commit to zk again
+ private transient long lastCommitTime;
+ // boolean to track deactivated state
+ private transient boolean deactivated;
+
+ KinesisRecordsManager (KinesisConfig kinesisConfig) {
+ this.kinesisConfig = kinesisConfig;
+ this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
+ this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
+ }
+
+ void initialize (int myTaskIndex, int totalTasks) {
+ deactivated = false;
+ lastCommitTime = System.currentTimeMillis();
+ kinesisConnection.initialize();
+ zkConnection.initialize();
+ List<Shard> shards = kinesisConnection.getShardsForStream(kinesisConfig.getStreamName());
+ LOG.info("myTaskIndex is " + myTaskIndex);
+ LOG.info("totalTasks is " + totalTasks);
+ int i = myTaskIndex;
+ while (i < shards.size()) {
+ LOG.info("Shard id " + shards.get(i).getShardId() + " assigned to task " + myTaskIndex);
+ toEmitPerShard.put(shards.get(i).getShardId(), new LinkedList<Record>());
+ i += totalTasks;
+ }
+ initializeFetchedSequenceNumbers();
+ refreshShardIteratorsForNewRecords();
+ }
+
+ void next (SpoutOutputCollector collector) {
+ if (shouldCommit()) {
+ commit();
+ }
+ KinesisMessageId failedMessageId = kinesisConfig.getFailedMessageRetryHandler().getNextFailedMessageToRetry();
+ if (failedMessageId != null) {
+ // if the retry service returns a message that is not in failed set then ignore it. should never happen
+ BigInteger failedSequenceNumber = new BigInteger(failedMessageId.getSequenceNumber());
+ if (failedPerShard.containsKey(failedMessageId.getShardId()) && failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber)) {
+ if (!failedandFetchedRecords.containsKey(failedMessageId)) {
+ fetchFailedRecords(failedMessageId);
+ }
+ if (emitFailedRecord(collector, failedMessageId)) {
+ failedPerShard.get(failedMessageId.getShardId()).remove(failedSequenceNumber);
+ kinesisConfig.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId);
+ return;
+ } else {
+ LOG.warn("failedMessageEmitted not called on retrier for " + failedMessageId + ". This can happen a few times but should not happen " +
+ "infinitely");
+ }
+ } else {
+ LOG.warn("failedPerShard does not contain " + failedMessageId + ". This should never happen.");
+ }
+ }
+ LOG.debug("No failed record to emit for now. Hence will try to emit new records");
+ // if maximum uncommitted records count has reached, so dont emit any new records and return
+ if (!(getUncommittedRecordsCount() < kinesisConfig.getMaxUncommittedRecords())) {
+ LOG.warn("maximum uncommitted records count has reached. so not emitting any new records and returning");
+ return;
+ }
+ // early return as no shard is assigned - probably because number of executors > number of shards
+ if (toEmitPerShard.isEmpty()) {
+ LOG.warn("No shard is assigned to this task. Hence not emitting any tuple.");
+ return;
+ }
+
+ if (shouldFetchNewRecords()) {
+ fetchNewRecords();
+ }
+ emitNewRecord(collector);
+ }
+
+ void ack (KinesisMessageId kinesisMessageId) {
+ // for an acked message add it to acked set and remove it from emitted and failed
+ String shardId = kinesisMessageId.getShardId();
+ BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
+ LOG.debug("Ack received for shardId: " + shardId + " sequenceNumber: " + sequenceNumber);
+ // if an ack is received for a message then add it to the ackedPerShard TreeSet. TreeSet because while committing we need to figure out what is the
+ // highest sequence number that can be committed for this shard
+ if (!ackedPerShard.containsKey(shardId)) {
+ ackedPerShard.put(shardId, new TreeSet<BigInteger>());
+ }
+ ackedPerShard.get(shardId).add(sequenceNumber);
+ // if the acked message was in emittedPerShard that means we need to remove it from the emittedPerShard(which keeps track of in flight tuples)
+ if (emittedPerShard.containsKey(shardId)) {
+ TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
+ emitted.remove(sequenceNumber);
+ }
+ // an acked message should not be in failed since if it fails and gets re-emitted it moves to emittedPerShard from failedPerShard. Defensive coding.
+ // Remove it from failedPerShard anyway
+ if (failedPerShard.containsKey(shardId)) {
+ failedPerShard.get(shardId).remove(sequenceNumber);
+ }
+ // if an ack is for a message that failed once at least and was re-emitted then the record itself will be in failedAndFetchedRecords. We use that to
+ // determine if the FailedMessageRetryHandler needs to be told about it and then remove the record itself to clean up memory
+ if (failedandFetchedRecords.containsKey(kinesisMessageId)) {
+ kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId);
+ failedandFetchedRecords.remove(kinesisMessageId);
+ }
+ // keep committing when topology is deactivated since ack and fail keep getting called on deactivated topology
+ if (deactivated) {
+ commit();
+ }
+ }
+
+ void fail (KinesisMessageId kinesisMessageId) {
+ String shardId = kinesisMessageId.getShardId();
+ BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
+ LOG.debug("Fail received for shardId: " + shardId + " sequenceNumber: " + sequenceNumber);
+ // for a failed message add it to failed set if it will be retried, otherwise ack it; remove from emitted either way
+ if (kinesisConfig.getFailedMessageRetryHandler().failed(kinesisMessageId)) {
+ if (!failedPerShard.containsKey(shardId)) {
+ failedPerShard.put(shardId, new TreeSet<BigInteger>());
+ }
+ failedPerShard.get(shardId).add(sequenceNumber);
+ TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
+ emitted.remove(sequenceNumber);
+ } else {
+ ack(kinesisMessageId);
+ }
+ // keep committing when topology is deactivated since ack and fail keep getting called on deactivated topology
+ if (deactivated) {
+ commit();
+ }
+ }
+
+ void commit () {
+ // We have three mutually disjoint treesets per shard at any given time to keep track of what sequence number can be committed to zookeeper.
+ // emittedPerShard, ackedPerShard and failedPerShard. Any record starts by entering emittedPerShard. On ack it moves from emittedPerShard to
+ // ackedPerShard and on fail if retry service tells us to retry then it moves from emittedPerShard to failedPerShard. The failed records will move from
+ // failedPerShard to emittedPerShard when the failed record is emitted again as a retry.
+ // Logic for deciding what sequence number to commit is find the highest sequence number from ackedPerShard called X such that there is no sequence
+ // number Y in emittedPerShard or failedPerShard that satisfies X > Y. For e.g. if ackedPerShard is 1,4,5, emittedPerShard is 2,6 and
+ // failedPerShard is 3,7 then we can only commit 1 and not 4 because 2 is still pending and 3 has failed
+ for (String shardId: toEmitPerShard.keySet()) {
+ if (ackedPerShard.containsKey(shardId)) {
+ BigInteger commitSequenceNumberBound = null;
+ if (failedPerShard.containsKey(shardId) && !failedPerShard.get(shardId).isEmpty()) {
+ commitSequenceNumberBound = failedPerShard.get(shardId).first();
+ }
+ if (emittedPerShard.containsKey(shardId) && !emittedPerShard.get(shardId).isEmpty()) {
+ BigInteger smallestEmittedSequenceNumber = emittedPerShard.get(shardId).first();
+ if (commitSequenceNumberBound == null || (commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1)) {
+ commitSequenceNumberBound = smallestEmittedSequenceNumber;
+ }
+ }
+ Iterator<BigInteger> ackedSequenceNumbers = ackedPerShard.get(shardId).iterator();
+ BigInteger ackedSequenceNumberToCommit = null;
+ while (ackedSequenceNumbers.hasNext()) {
+ BigInteger ackedSequenceNumber = ackedSequenceNumbers.next();
+ if (commitSequenceNumberBound == null || (commitSequenceNumberBound.compareTo(ackedSequenceNumber) == 1)) {
+ ackedSequenceNumberToCommit = ackedSequenceNumber;
+ ackedSequenceNumbers.remove();
+ } else {
+ break;
+ }
+ }
+ if (ackedSequenceNumberToCommit != null) {
+ Map<Object, Object> state = new HashMap<>();
+ state.put("committedSequenceNumber", ackedSequenceNumberToCommit.toString());
+ LOG.debug("Committing sequence number " + ackedSequenceNumberToCommit.toString() + " for shardId " + shardId);
+ zkConnection.commitState(kinesisConfig.getStreamName(), shardId, state);
+ }
+ }
+ }
+ lastCommitTime = System.currentTimeMillis();
+ }
+
+ void activate () {
+ LOG.info("Activate called");
+ deactivated = false;
+ kinesisConnection.initialize();
+ }
+
+ void deactivate () {
+ LOG.info("Deactivate called");
+ deactivated = true;
+ commit();
+ kinesisConnection.shutdown();
+ }
+
+ void close () {
+ commit();
+ kinesisConnection.shutdown();
+ zkConnection.shutdown();
+ }
+
+ // fetch records from kinesis starting at sequence number for message passed as argument. Any other messages fetched and are in the failed queue will also
+ // be kept in memory to avoid going to kinesis again for retry
+ private void fetchFailedRecords (KinesisMessageId kinesisMessageId) {
+ // if shard iterator not present for this message, get it
+ if (!shardIteratorPerFailedMessage.containsKey(kinesisMessageId)) {
+ refreshShardIteratorForFailedRecord(kinesisMessageId);
+ }
+ String shardIterator = shardIteratorPerFailedMessage.get(kinesisMessageId);
+ LOG.debug("Fetching failed records for shard id :" + kinesisMessageId.getShardId() + " at sequence number " + kinesisMessageId.getSequenceNumber() +
+ " using shardIterator " + shardIterator);
+ try {
+ GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator);
+ if (getRecordsResult != null) {
+ List<Record> records = getRecordsResult.getRecords();
+ LOG.debug("Records size from fetchFailedRecords is " + records.size());
+ // update the shard iterator to next one in case this fetch does not give the message.
+ shardIteratorPerFailedMessage.put(kinesisMessageId, getRecordsResult.getNextShardIterator());
+ if (records.size() == 0) {
+ LOG.warn("No records returned from kinesis. Hence sleeping for 1 second");
+ Thread.sleep(1000);
+ } else {
+ // add all fetched records to the set of failed records if they are present in failed set
+ for (Record record: records) {
+ KinesisMessageId current = new KinesisMessageId(kinesisMessageId.getStreamName(), kinesisMessageId.getShardId(), record.getSequenceNumber());
+ if (failedPerShard.get(kinesisMessageId.getShardId()).contains(new BigInteger(current.getSequenceNumber()))) {
+ failedandFetchedRecords.put(current, record);
+ shardIteratorPerFailedMessage.remove(current);
+ }
+ }
+ }
+ }
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread interrupted while sleeping", ie);
+ } catch (ExpiredIteratorException ex) {
+ LOG.warn("shardIterator for failedRecord " + kinesisMessageId + " has expired. Refreshing shardIterator");
+ refreshShardIteratorForFailedRecord(kinesisMessageId);
+ } catch (ProvisionedThroughputExceededException pe) {
+ try {
+ LOG.warn("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", pe);
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.warn("Thread interrupted exception", e);
+ }
+ }
+ }
+
+ private void fetchNewRecords () {
+ for (Map.Entry<String, LinkedList<Record>> entry : toEmitPerShard.entrySet()) {
+ String shardId = entry.getKey();
+ try {
+ String shardIterator = shardIteratorPerShard.get(shardId);
+ LOG.debug("Fetching new records for shard id :" + shardId + " using shardIterator " + shardIterator + " after sequence number " +
+ fetchedSequenceNumberPerShard.get(shardId));
+ GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator);
+ if (getRecordsResult != null) {
+ List<Record> records = getRecordsResult.getRecords();
+ LOG.debug("Records size from fetchNewRecords is " + records.size());
+ // update the shard iterator to next one in case this fetch does not give the message.
+ shardIteratorPerShard.put(shardId, getRecordsResult.getNextShardIterator());
+ if (records.size() == 0) {
+ LOG.warn("No records returned from kinesis. Hence sleeping for 1 second");
+ Thread.sleep(1000);
+ } else {
+ entry.getValue().addAll(records);
+ fetchedSequenceNumberPerShard.put(shardId, records.get(records.size() - 1).getSequenceNumber());
+ }
+ }
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread interrupted while sleeping", ie);
+ } catch (ExpiredIteratorException ex) {
+ LOG.warn("shardIterator for shardId " + shardId + " has expired. Refreshing shardIterator");
+ refreshShardIteratorForNewRecords(shardId);
+ } catch (ProvisionedThroughputExceededException pe) {
+ try {
+ LOG.warn("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", pe);
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.warn("Thread interrupted exception", e);
+ }
+ }
+ }
+ }
+
+ private void emitNewRecord (SpoutOutputCollector collector) {
+ for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
+ String shardId = entry.getKey();
+ LinkedList<Record> listOfRecords = entry.getValue();
+ Record record;
+ while ((record = listOfRecords.pollFirst()) != null) {
+ KinesisMessageId kinesisMessageId = new KinesisMessageId(kinesisConfig.getStreamName(), shardId, record.getSequenceNumber());
+ if (emitRecord(collector, record, kinesisMessageId)) {
+ return;
+ }
+ }
+ }
+ }
+
+ private boolean emitFailedRecord (SpoutOutputCollector collector, KinesisMessageId kinesisMessageId) {
+ if (!failedandFetchedRecords.containsKey(kinesisMessageId)) {
+ return false;
+ }
+ return emitRecord(collector, failedandFetchedRecords.get(kinesisMessageId), kinesisMessageId);
+ }
+
+ private boolean emitRecord (SpoutOutputCollector collector, Record record, KinesisMessageId kinesisMessageId) {
+ boolean result = false;
+ List<Object> tuple = kinesisConfig.getRecordToTupleMapper().getTuple(record);
+ // if a record is returned put the sequence number in the emittedPerShard to tie back with ack or fail
+ if (tuple != null && tuple.size() > 0) {
+ collector.emit(tuple, kinesisMessageId);
+ if (!emittedPerShard.containsKey(kinesisMessageId.getShardId())) {
+ emittedPerShard.put(kinesisMessageId.getShardId(), new TreeSet<BigInteger>());
+ }
+ emittedPerShard.get(kinesisMessageId.getShardId()).add(new BigInteger(record.getSequenceNumber()));
+ result = true;
+ } else {
+ // ack to not process the record again on restart and move on to next message
+ LOG.warn("Record " + record + " did not return a tuple to emit. Hence acking it");
+ ack(kinesisMessageId);
+ }
+ return result;
+ }
+
+ private boolean shouldCommit () {
+ return (System.currentTimeMillis() - lastCommitTime >= kinesisConfig.getZkInfo().getCommitIntervalMs());
+ }
+
+ private void initializeFetchedSequenceNumbers () {
+ for (String shardId : toEmitPerShard.keySet()) {
+ Map<Object, Object> state = zkConnection.readState(kinesisConfig.getStreamName(), shardId);
+ // if state found for this shard in zk, then set the sequence number in fetchedSequenceNumber
+ if (state != null) {
+ Object committedSequenceNumber = state.get("committedSequenceNumber");
+ LOG.info("State read is committedSequenceNumber: " + committedSequenceNumber + " shardId:" + shardId);
+ if (committedSequenceNumber != null) {
+ fetchedSequenceNumberPerShard.put(shardId, (String) committedSequenceNumber);
+ }
+ }
+ }
+ }
+
+ private void refreshShardIteratorsForNewRecords () {
+ for (String shardId: toEmitPerShard.keySet()) {
+ refreshShardIteratorForNewRecords(shardId);
+ }
+ }
+
+ private void refreshShardIteratorForNewRecords (String shardId) {
+ String shardIterator = null;
+ String lastFetchedSequenceNumber = fetchedSequenceNumberPerShard.get(shardId);
+ ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null ? kinesisConfig.getShardIteratorType() : ShardIteratorType
+ .AFTER_SEQUENCE_NUMBER);
+ // Set the shard iterator for last fetched sequence number to start from correct position in shard
+ shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), shardId, shardIteratorType, lastFetchedSequenceNumber, kinesisConfig
+ .getTimestamp());
+ if (shardIterator != null && !shardIterator.isEmpty()) {
+ LOG.warn("Refreshing shard iterator for new records for shardId " + shardId + " with shardIterator " + shardIterator);
+ shardIteratorPerShard.put(shardId, shardIterator);
+ }
+ }
+
+ private void refreshShardIteratorForFailedRecord (KinesisMessageId kinesisMessageId) {
+ String shardIterator = null;
+ // Set the shard iterator for last fetched sequence number to start from correct position in shard
+ shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), kinesisMessageId.getShardId(), ShardIteratorType
+ .AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
+ if (shardIterator != null && !shardIterator.isEmpty()) {
+ LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId + " with shardIterator " + shardIterator);
+ shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator);
+ }
+ }
+
+ private Long getUncommittedRecordsCount () {
+ Long result = 0L;
+ for (Map.Entry<String, TreeSet<BigInteger>> emitted: emittedPerShard.entrySet()) {
+ result += emitted.getValue().size();
+ }
+ for (Map.Entry<String, TreeSet<BigInteger>> acked: ackedPerShard.entrySet()) {
+ result += acked.getValue().size();
+ }
+ for (Map.Entry<String, TreeSet<BigInteger>> failed: failedPerShard.entrySet()) {
+ result += failed.getValue().size();
+ }
+ LOG.debug("Returning uncommittedRecordsCount as " + result);
+ return result;
+ }
+
+ private boolean shouldFetchNewRecords () {
+ // check to see if any shard has already fetched records waiting to be emitted, in which case dont fetch more
+ boolean fetchRecords = true;
+ for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
+ if (!entry.getValue().isEmpty()) {
+ fetchRecords = false;
+ break;
+ }
+ }
+ return fetchRecords;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
new file mode 100644
index 0000000..500195b
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+
+import java.util.Map;
+
+public class KinesisSpout extends BaseRichSpout {
+
+ private final KinesisConfig kinesisConfig;
+ private transient KinesisRecordsManager kinesisRecordsManager;
+ private transient SpoutOutputCollector collector;
+
+ public KinesisSpout (KinesisConfig kinesisConfig) {
+ this.kinesisConfig = kinesisConfig;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(kinesisConfig.getRecordToTupleMapper().getOutputFields());
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration () {
+ return super.getComponentConfiguration();
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ kinesisRecordsManager = new KinesisRecordsManager(kinesisConfig);
+ kinesisRecordsManager.initialize(context.getThisTaskIndex(), context.getComponentTasks(context.getThisComponentId()).size());
+ }
+
+ @Override
+ public void close() {
+ kinesisRecordsManager.close();
+ }
+
+ @Override
+ public void activate() {
+ kinesisRecordsManager.activate();
+ }
+
+ @Override
+ public void deactivate() {
+ kinesisRecordsManager.deactivate();
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ kinesisRecordsManager.ack((KinesisMessageId) msgId);
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ kinesisRecordsManager.fail((KinesisMessageId) msgId);
+ }
+
+ @Override
+ public void nextTuple() {
+ kinesisRecordsManager.next(collector);
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
new file mode 100644
index 0000000..c806539
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.Record;
+import org.apache.storm.tuple.Fields;
+
+import java.util.List;
+
+public interface RecordToTupleMapper {
+ /**
+ *
+ * @return names of fields in the emitted tuple
+ */
+ Fields getOutputFields ();
+
+ /**
+ *
+ * @param record kinesis record
+ * @return storm tuple to be emitted for this record, null if no tuple should be emitted
+ */
+ List<Object> getTuple (Record record);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
new file mode 100644
index 0000000..41151d1
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.zookeeper.CreateMode;
+import org.json.simple.JSONValue;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+class ZKConnection {
+
+ private final ZkInfo zkInfo;
+ private CuratorFramework curatorFramework;
+
+ ZKConnection (ZkInfo zkInfo) {
+ this.zkInfo = zkInfo;
+ }
+
+ void initialize () {
+ curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), zkInfo.getSessionTimeoutMs(), zkInfo.getConnectionTimeoutMs(), new
+ RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
+ curatorFramework.start();
+ }
+
+ void commitState (String stream, String shardId, Map<Object, Object> state) {
+ byte[] bytes = JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8"));
+ try {
+ String path = getZkPath(stream, shardId);
+ if (curatorFramework.checkExists().forPath(path) == null) {
+ curatorFramework.create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(path, bytes);
+ } else {
+ curatorFramework.setData().forPath(path, bytes);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ Map<Object, Object> readState (String stream, String shardId) {
+ try {
+ String path = getZkPath(stream, shardId);
+ Map<Object, Object> state = null;
+ byte[] b = null;
+ if (curatorFramework.checkExists().forPath(path) != null) {
+ b = curatorFramework.getData().forPath(path);
+ }
+ if (b != null) {
+ state = (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
+ }
+ return state;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ void shutdown () {
+ curatorFramework.close();
+ }
+
+ private String getZkPath (String stream, String shardId) {
+ String path = "";
+ if (!zkInfo.getZkNode().startsWith("/")) {
+ path += "/";
+ }
+ path += zkInfo.getZkNode();
+ if (!zkInfo.getZkNode().endsWith("/")) {
+ path += "/";
+ }
+ path += (stream + "/" + shardId);
+ return path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/eed4b423/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
----------------------------------------------------------------------
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
new file mode 100644
index 0000000..a47f0ab
--- /dev/null
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import java.io.Serializable;
+
+public class ZkInfo implements Serializable {
+ // comma separated list of zk connect strings to connect to zookeeper e.g. localhost:2181
+ private final String zkUrl;
+ // zk node under which to commit the sequence number of messages. e.g. /committed_sequence_numbers
+ private final String zkNode;
+ // zk session timeout in milliseconds
+ private final Integer sessionTimeoutMs;
+ // zk connection timeout in milliseconds
+ private final Integer connectionTimeoutMs;
+ // interval at which to commit offsets to zk in milliseconds
+ private final Long commitIntervalMs;
+ // number of retry attempts for zk
+ private final Integer retryAttempts;
+ // time to sleep between retries in milliseconds
+ private final Integer retryIntervalMs;
+
+ public ZkInfo (String zkUrl, String zkNode, Integer sessionTimeoutMs, Integer connectionTimeoutMs, Long commitIntervalMs, Integer retryAttempts, Integer
+ retryIntervalMs) {
+ this.zkUrl = zkUrl;
+ this.zkNode = zkNode;
+ this.sessionTimeoutMs = sessionTimeoutMs;
+ this.connectionTimeoutMs = connectionTimeoutMs;
+ this.commitIntervalMs = commitIntervalMs;
+ this.retryAttempts = retryAttempts;
+ this.retryIntervalMs = retryIntervalMs;
+ validate();
+ }
+
+ public String getZkUrl() {
+ return zkUrl;
+ }
+
+ public String getZkNode() {
+ return zkNode;
+ }
+
+ public Integer getSessionTimeoutMs() {
+ return sessionTimeoutMs;
+ }
+
+ public Integer getConnectionTimeoutMs() {
+ return connectionTimeoutMs;
+ }
+
+ public Long getCommitIntervalMs() {
+ return commitIntervalMs;
+ }
+
+ public Integer getRetryAttempts() {
+ return retryAttempts;
+ }
+
+ public Integer getRetryIntervalMs() {
+ return retryIntervalMs;
+ }
+
+ private void validate () {
+
+ if (zkUrl == null || zkUrl.length() < 1) {
+ throw new IllegalArgumentException("zkUrl must be specified to connect to zookeeper");
+ }
+ if (zkNode == null || zkNode.length() < 1) {
+ throw new IllegalArgumentException("zkNode must be specified");
+ }
+ checkPositive(sessionTimeoutMs, "sessionTimeoutMs");
+ checkPositive(connectionTimeoutMs, "connectionTimeoutMs");
+ checkPositive(commitIntervalMs, "commitIntervalMs");
+ checkPositive(retryAttempts, "retryAttempts");
+ checkPositive(retryIntervalMs, "retryIntervalMs");
+ }
+
+ private void checkPositive (Integer argument, String name) {
+ if (argument == null && argument <= 0) {
+ throw new IllegalArgumentException(name + " must be positive");
+ }
+ }
+ private void checkPositive (Long argument, String name) {
+ if (argument == null && argument <= 0) {
+ throw new IllegalArgumentException(name + " must be positive");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ZkInfo{" +
+ "zkUrl='" + zkUrl + '\'' +
+ ", zkNode='" + zkNode + '\'' +
+ ", sessionTimeoutMs=" + sessionTimeoutMs +
+ ", connectionTimeoutMs=" + connectionTimeoutMs +
+ ", commitIntervalMs=" + commitIntervalMs +
+ ", retryAttempts=" + retryAttempts +
+ ", retryIntervalMs=" + retryIntervalMs +
+ '}';
+ }
+
+}