You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/06/27 17:42:02 UTC
[storm] branch master updated: STORM-3421: Fix checkstyle
violations in storm-kinesis
This is an automated email from the ASF dual-hosted git repository.
srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 34750d5 STORM-3421: Fix checkstyle violations in storm-kinesis
new f617c6e Merge pull request #3036 from krichter722/checkstyle-kinesis
34750d5 is described below
commit 34750d557b82cbdfdef379ab9b7162142a06b169
Author: Karl-Philipp Richter <kr...@posteo.de>
AuthorDate: Mon Jun 24 01:15:03 2019 +0200
STORM-3421: Fix checkstyle violations in storm-kinesis
---
external/storm-kinesis/pom.xml | 2 +-
.../kinesis/spout/CredentialsProviderChain.java | 5 +-
.../kinesis/spout/ExponentialBackoffRetrier.java | 33 +++--
.../kinesis/spout/FailedMessageRetryHandler.java | 22 +--
.../apache/storm/kinesis/spout/KinesisConfig.java | 59 ++++----
.../storm/kinesis/spout/KinesisConnection.java | 43 ++++--
.../storm/kinesis/spout/KinesisConnectionInfo.java | 26 ++--
.../storm/kinesis/spout/KinesisMessageId.java | 40 +++---
.../storm/kinesis/spout/KinesisRecordsManager.java | 157 +++++++++++++--------
.../apache/storm/kinesis/spout/KinesisSpout.java | 8 +-
.../storm/kinesis/spout/RecordToTupleMapper.java | 12 +-
.../spout/{ZKConnection.java => ZkConnection.java} | 26 ++--
.../org/apache/storm/kinesis/spout/ZkInfo.java | 34 +++--
13 files changed, 279 insertions(+), 188 deletions(-)
diff --git a/external/storm-kinesis/pom.xml b/external/storm-kinesis/pom.xml
index 6f04950..39c89f1 100644
--- a/external/storm-kinesis/pom.xml
+++ b/external/storm-kinesis/pom.xml
@@ -62,7 +62,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>185</maxAllowedViolations>
+ <maxAllowedViolations>0</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
index 4287ae0..5820851 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
@@ -26,10 +26,11 @@ import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
/**
- * Class representing chain of mechanisms that will be used in order to connect to kinesis
+ * Class representing chain of mechanisms that will be used in order to connect to kinesis.
*/
public class CredentialsProviderChain extends AWSCredentialsProviderChain {
- public CredentialsProviderChain () {
+
+ public CredentialsProviderChain() {
super(new EnvironmentVariableCredentialsProvider(),
new SystemPropertiesCredentialsProvider(),
new ClasspathPropertiesFileCredentialsProvider(),
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
index 88e8d70..2920913 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
@@ -18,9 +18,6 @@
package org.apache.storm.kinesis.spout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.Serializable;
import java.util.Comparator;
import java.util.HashMap;
@@ -28,6 +25,9 @@ import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Serializable {
private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
// Wait interfal for retrying after first failure
@@ -44,29 +44,29 @@ public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Ser
private SortedSet<KinesisMessageId> retryMessageSet = new TreeSet<>(new RetryTimeComparator());
/**
- * no args constructor that uses defaults of 100 ms for first retry, max retries of Long.MAX_VALUE and an exponential backoff of Math.pow(2,i-1) secs for
- * retry i where i = 2,3,
+ * No args constructor that uses defaults of 100 ms for first retry, max retries of Long.MAX_VALUE and an
+ * exponential backoff of {@code Math.pow(2,i-1)} secs for retry {@code i} where {@code i = 2,3,...}.
*/
- public ExponentialBackoffRetrier () {
+ public ExponentialBackoffRetrier() {
this(100L, 2L, Long.MAX_VALUE);
}
/**
- *
+ * Creates a new exponential backoff retrier.
* @param initialDelayMillis delay in milliseconds for first retry
* @param baseSeconds base for exponent function in seconds
* @param maxRetries maximum number of retries before the record is discarded/acked
*/
- public ExponentialBackoffRetrier (Long initialDelayMillis, Long baseSeconds, Long maxRetries) {
+ public ExponentialBackoffRetrier(Long initialDelayMillis, Long baseSeconds, Long maxRetries) {
this.initialDelayMillis = initialDelayMillis;
this.baseSeconds = baseSeconds;
this.maxRetries = maxRetries;
validate();
}
- private void validate () {
+ private void validate() {
if (initialDelayMillis < 0) {
- throw new IllegalArgumentException("initialDelayMillis cannot be negative." );
+ throw new IllegalArgumentException("initialDelayMillis cannot be negative.");
}
if (baseSeconds < 0) {
throw new IllegalArgumentException("baseSeconds cannot be negative.");
@@ -75,6 +75,7 @@ public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Ser
throw new IllegalArgumentException("maxRetries cannot be negative.");
}
}
+
@Override
public boolean failed(KinesisMessageId messageId) {
LOG.debug("Handling failed message {}", messageId);
@@ -114,7 +115,7 @@ public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Ser
public KinesisMessageId getNextFailedMessageToRetry() {
KinesisMessageId result = null;
// return the first message to be retried from the set. It will return the message with the earliest retry time <= current time
- if (!retryMessageSet.isEmpty() ) {
+ if (!retryMessageSet.isEmpty()) {
result = retryMessageSet.first();
if (!(retryTimes.get(result) <= System.nanoTime())) {
result = null;
@@ -126,15 +127,19 @@ public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Ser
@Override
public void failedMessageEmitted(KinesisMessageId messageId) {
- // spout notified that message returned by us for retrying was actually emitted. hence remove it from set and wait for its ack or fail
+ // spout notified that message returned by us for retrying was actually emitted. hence remove it from set and
+ // wait for its ack or fail
// but still keep it in counts map to retry again on failure or remove on ack
LOG.debug("Spout says {} emitted. Hence removing it from queue and wait for its ack or fail", messageId);
retryMessageSet.remove(messageId);
retryTimes.remove(messageId);
}
- // private helper method to get next retry time for retry attempt i (handles long overflow as well by capping it to Long.MAX_VALUE)
- private Long getRetryTime (Long retryNum) {
+ /**
+ * private helper method to get next retry time for retry attempt i (handles long overflow as well by capping it to
+ * Long.MAX_VALUE).
+ */
+ private Long getRetryTime(Long retryNum) {
Long retryTime = System.nanoTime();
Long nanoMultiplierForMillis = 1000000L;
// if first retry then retry time = current time + initial delay
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
index bb0e450..c005c59 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
@@ -22,27 +22,27 @@ import java.io.Serializable;
public interface FailedMessageRetryHandler extends Serializable {
/**
- * message with messageId failed in the spout
- * @param messageId
+ * message with messageId failed in the spout.
+ * @param messageId the message id
* @return true if this failed message was scheduled to be retried, false otherwise
*/
- boolean failed (KinesisMessageId messageId);
+ boolean failed(KinesisMessageId messageId);
/**
- * message with messageId succeeded/acked in the spout
- * @param messageId
+ * message with messageId succeeded/acked in the spout.
+ * @param messageId the message id
*/
- void acked (KinesisMessageId messageId);
+ void acked(KinesisMessageId messageId);
/**
- * Get the next failed message's id to retry if any, null otherwise
+ * Get the next failed message's id to retry if any, null otherwise.
* @return messageId
*/
- KinesisMessageId getNextFailedMessageToRetry ();
+ KinesisMessageId getNextFailedMessageToRetry();
/**
- * message with messageId returned by last call to getNextFailedMessageToRetry was emitted/retried by the spout
- * @param messageId
+ * message with messageId returned by last call to getNextFailedMessageToRetry was emitted/retried by the spout.
+ * @param messageId the message id
*/
- void failedMessageEmitted (KinesisMessageId messageId);
+ void failedMessageEmitted(KinesisMessageId messageId);
}
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
index 744aef5..eeb5896 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
@@ -38,13 +38,22 @@ public class KinesisConfig implements Serializable {
private final ZkInfo zkInfo;
// object representing information on paramaters to use while connecting to kinesis using kinesis client
private final KinesisConnectionInfo kinesisConnectionInfo;
- // this number represents the number of messages that are still not committed to zk. it will prevent the spout from emitting further.
- // for e.g. if 1 failed and 2,3,4,5..... all have been acked by storm, they still cant be committed to zk because 1 is still in failed set. As a result
- // the acked queue can infinitely grow without any of them being committed to zk. topology max pending does not help since from storm's view they are acked
+ /**
+ * This number represents the number of messages that are still not committed to zk. it will prevent the spout from
+ * emitting further. for e.g. if 1 failed and 2,3,4,5..... all have been acked by storm, they still can't be
+ * committed to zk because 1 is still in failed set. As a result the acked queue can infinitely grow without any of
+ * them being committed to zk. topology max pending does not help since from storm's view they are acked
+ */
private final Long maxUncommittedRecords;
- public KinesisConfig(String streamName, ShardIteratorType shardIteratorType, RecordToTupleMapper recordToTupleMapper, Date timestamp, FailedMessageRetryHandler
- failedMessageRetryHandler, ZkInfo zkInfo, KinesisConnectionInfo kinesisConnectionInfo, Long maxUncommittedRecords) {
+ public KinesisConfig(String streamName,
+ ShardIteratorType shardIteratorType,
+ RecordToTupleMapper recordToTupleMapper,
+ Date timestamp,
+ FailedMessageRetryHandler failedMessageRetryHandler,
+ ZkInfo zkInfo,
+ KinesisConnectionInfo kinesisConnectionInfo,
+ Long maxUncommittedRecords) {
this.streamName = streamName;
this.shardIteratorType = shardIteratorType;
this.recordToTupleMapper = recordToTupleMapper;
@@ -56,14 +65,16 @@ public class KinesisConfig implements Serializable {
validate();
}
- private void validate () {
+ private void validate() {
if (streamName == null || streamName.length() < 1) {
throw new IllegalArgumentException("streamName is required and cannot be of length 0.");
}
- if (shardIteratorType == null || shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType
- .AT_SEQUENCE_NUMBER)) {
- throw new IllegalArgumentException("shardIteratorType has to be one of the " + ShardIteratorType.AT_TIMESTAMP + "," + ShardIteratorType.LATEST +
- "," + ShardIteratorType.TRIM_HORIZON);
+ if (shardIteratorType == null
+ || shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+ || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
+ throw new IllegalArgumentException("shardIteratorType has to be one of the " + ShardIteratorType.AT_TIMESTAMP
+ + "," + ShardIteratorType.LATEST
+ + "," + ShardIteratorType.TRIM_HORIZON);
}
if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP) && timestamp == null) {
throw new IllegalArgumentException("timestamp must be provided if shardIteratorType is " + ShardIteratorType.AT_TIMESTAMP);
@@ -101,33 +112,33 @@ public class KinesisConfig implements Serializable {
return timestamp;
}
- public FailedMessageRetryHandler getFailedMessageRetryHandler () {
+ public FailedMessageRetryHandler getFailedMessageRetryHandler() {
return failedMessageRetryHandler;
}
- public ZkInfo getZkInfo () {
+ public ZkInfo getZkInfo() {
return zkInfo;
}
- public KinesisConnectionInfo getKinesisConnectionInfo () {
+ public KinesisConnectionInfo getKinesisConnectionInfo() {
return kinesisConnectionInfo;
}
- public Long getMaxUncommittedRecords () {
+ public Long getMaxUncommittedRecords() {
return maxUncommittedRecords;
}
@Override
public String toString() {
- return "KinesisConfig{" +
- "streamName='" + streamName + '\'' +
- ", shardIteratorType=" + shardIteratorType +
- ", recordToTupleMapper=" + recordToTupleMapper +
- ", timestamp=" + timestamp +
- ", zkInfo=" + zkInfo +
- ", kinesisConnectionInfo=" + kinesisConnectionInfo +
- ", failedMessageRetryHandler =" + failedMessageRetryHandler +
- ", maxUncommittedRecords=" + maxUncommittedRecords +
- '}';
+ return "KinesisConfig{"
+ + "streamName='" + streamName + '\''
+ + ", shardIteratorType=" + shardIteratorType
+ + ", recordToTupleMapper=" + recordToTupleMapper
+ + ", timestamp=" + timestamp
+ + ", zkInfo=" + zkInfo
+ + ", kinesisConnectionInfo=" + kinesisConnectionInfo
+ + ", failedMessageRetryHandler =" + failedMessageRetryHandler
+ + ", maxUncommittedRecords=" + maxUncommittedRecords
+ + '}';
}
}
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
index dfd9049..b8b969a 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
@@ -28,28 +28,30 @@ import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
class KinesisConnection {
private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
private final KinesisConnectionInfo kinesisConnectionInfo;
private AmazonKinesisClient kinesisClient;
- KinesisConnection (KinesisConnectionInfo kinesisConnectionInfo) {
+ KinesisConnection(KinesisConnectionInfo kinesisConnectionInfo) {
this.kinesisConnectionInfo = kinesisConnectionInfo;
}
- void initialize () {
- kinesisClient = new AmazonKinesisClient(kinesisConnectionInfo.getCredentialsProvider(), kinesisConnectionInfo.getClientConfiguration());
+ void initialize() {
+ kinesisClient = new AmazonKinesisClient(kinesisConnectionInfo.getCredentialsProvider(),
+ kinesisConnectionInfo.getClientConfiguration());
kinesisClient.setRegion(Region.getRegion(kinesisConnectionInfo.getRegion()));
}
- List<Shard> getShardsForStream (String stream) {
+ List<Shard> getShardsForStream(String stream) {
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(stream);
List<Shard> shards = new ArrayList<>();
@@ -63,19 +65,24 @@ class KinesisConnection {
} else {
exclusiveStartShardId = null;
}
- } while ( exclusiveStartShardId != null );
+ } while (exclusiveStartShardId != null);
LOG.info("Number of shards for stream " + stream + " are " + shards.size());
return shards;
}
- String getShardIterator (String stream, String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Date timestamp) {
+ String getShardIterator(String stream,
+ String shardId,
+ ShardIteratorType shardIteratorType,
+ String sequenceNumber,
+ Date timestamp) {
String shardIterator = "";
try {
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
getShardIteratorRequest.setStreamName(stream);
getShardIteratorRequest.setShardId(shardId);
getShardIteratorRequest.setShardIteratorType(shardIteratorType);
- if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
+ if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+ || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
} else if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) {
getShardIteratorRequest.setTimestamp(timestamp);
@@ -85,15 +92,21 @@ class KinesisConnection {
shardIterator = getShardIteratorResult.getShardIterator();
}
} catch (Exception e) {
- LOG.warn("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " +
- sequenceNumber + " timestamp " + timestamp, e);
+ LOG.warn("Exception occured while getting shardIterator for shard " + shardId
+ + " shardIteratorType " + shardIteratorType
+ + " sequence number " + sequenceNumber
+ + " timestamp " + timestamp,
+ e);
}
- LOG.warn("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " +
- sequenceNumber + " timestamp" + timestamp);
+ LOG.warn("Returning shardIterator " + shardIterator
+ + " for shardId " + shardId
+ + " shardIteratorType " + shardIteratorType
+ + " sequenceNumber " + sequenceNumber
+ + " timestamp" + timestamp);
return shardIterator;
}
- GetRecordsResult fetchRecords (String shardIterator) {
+ GetRecordsResult fetchRecords(String shardIterator) {
GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(kinesisConnectionInfo.getRecordsLimit());
@@ -101,7 +114,7 @@ class KinesisConnection {
return getRecordsResult;
}
- void shutdown () {
+ void shutdown() {
kinesisClient.shutdown();
}
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
index 67ca29f..121e0bf 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
@@ -24,13 +24,14 @@ import com.amazonaws.regions.Regions;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
-import org.objenesis.strategy.StdInstantiatorStrategy;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Serializable;
import java.util.Arrays;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
public class KinesisConnectionInfo implements Serializable {
private final byte[] serializedKinesisCredsProvider;
private final byte[] serializedkinesisClientConfig;
@@ -41,13 +42,16 @@ public class KinesisConnectionInfo implements Serializable {
private transient ClientConfiguration clientConfiguration;
/**
- *
+ * Create a new Kinesis connection info.
* @param credentialsProvider implementation to provide credentials to connect to kinesis
* @param clientConfiguration client configuration to pass to kinesis client
* @param region region to connect to
* @param recordsLimit max records to be fetched in a getRecords request to kinesis
*/
- public KinesisConnectionInfo (AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, Regions region, Integer recordsLimit) {
+ public KinesisConnectionInfo(AWSCredentialsProvider credentialsProvider,
+ ClientConfiguration clientConfiguration,
+ Regions region,
+ Integer recordsLimit) {
if (recordsLimit == null || recordsLimit <= 0) {
throw new IllegalArgumentException("recordsLimit has to be a positive integer");
}
@@ -82,7 +86,7 @@ public class KinesisConnectionInfo implements Serializable {
return region;
}
- private byte[] getKryoSerializedBytes (final Object obj) {
+ private byte[] getKryoSerializedBytes(final Object obj) {
final Kryo kryo = new Kryo();
final ByteArrayOutputStream os = new ByteArrayOutputStream();
final Output output = new Output(os);
@@ -92,7 +96,7 @@ public class KinesisConnectionInfo implements Serializable {
return os.toByteArray();
}
- private Object getKryoDeserializedObject (final byte[] ser) {
+ private Object getKryoDeserializedObject(final byte[] ser) {
final Kryo kryo = new Kryo();
final Input input = new Input(new ByteArrayInputStream(ser));
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
@@ -101,11 +105,11 @@ public class KinesisConnectionInfo implements Serializable {
@Override
public String toString() {
- return "KinesisConnectionInfo{" +
- "serializedKinesisCredsProvider=" + Arrays.toString(serializedKinesisCredsProvider) +
- ", serializedkinesisClientConfig=" + Arrays.toString(serializedkinesisClientConfig) +
- ", region=" + region +
- ", recordsLimit=" + recordsLimit +
- '}';
+ return "KinesisConnectionInfo{"
+ + "serializedKinesisCredsProvider=" + Arrays.toString(serializedKinesisCredsProvider)
+ + ", serializedkinesisClientConfig=" + Arrays.toString(serializedkinesisClientConfig)
+ + ", region=" + region
+ + ", recordsLimit=" + recordsLimit
+ + '}';
}
}
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
index dd239f1..7fd179f 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
@@ -23,48 +23,56 @@ public class KinesisMessageId {
private final String shardId;
private final String sequenceNumber;
- KinesisMessageId (String streamName, String shardId, String sequenceNumber) {
+ KinesisMessageId(String streamName, String shardId, String sequenceNumber) {
this.streamName = streamName;
this.shardId = shardId;
this.sequenceNumber = sequenceNumber;
}
- public String getStreamName () {
+ public String getStreamName() {
return streamName;
}
- public String getShardId () {
+ public String getShardId() {
return shardId;
}
- public String getSequenceNumber () {
+ public String getSequenceNumber() {
return sequenceNumber;
}
@Override
- public String toString () {
- return "KinesisMessageId{" +
- "streamName='" + streamName + '\'' +
- ", shardId='" + shardId + '\'' +
- ", sequenceNumber='" + sequenceNumber + '\'' +
- '}';
+ public String toString() {
+ return "KinesisMessageId{"
+ + "streamName='" + streamName + '\''
+ + ", shardId='" + shardId + '\''
+ + ", sequenceNumber='" + sequenceNumber + '\''
+ + '}';
}
@Override
- public boolean equals (Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
KinesisMessageId that = (KinesisMessageId) o;
- if (streamName != null ? !streamName.equals(that.streamName) : that.streamName != null) return false;
- if (shardId != null ? !shardId.equals(that.shardId) : that.shardId != null) return false;
+ if (streamName != null ? !streamName.equals(that.streamName) : that.streamName != null) {
+ return false;
+ }
+ if (shardId != null ? !shardId.equals(that.shardId) : that.shardId != null) {
+ return false;
+ }
return !(sequenceNumber != null ? !sequenceNumber.equals(that.sequenceNumber) : that.sequenceNumber != null);
}
@Override
- public int hashCode () {
+ public int hashCode() {
int result = streamName != null ? streamName.hashCode() : 0;
result = 31 * result + (shardId != null ? shardId.hashCode() : 0);
result = 31 * result + (sequenceNumber != null ? sequenceNumber.hashCode() : 0);
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
index 7831193..33dba4a 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
@@ -24,9 +24,6 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.math.BigInteger;
import java.util.HashMap;
@@ -36,20 +33,27 @@ import java.util.List;
import java.util.Map;
import java.util.TreeSet;
+import org.apache.storm.spout.SpoutOutputCollector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
class KinesisRecordsManager {
private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
// object handling zk interaction
- private transient ZKConnection zkConnection;
+ private transient ZkConnection zkConnection;
// object handling interaction with kinesis
private transient KinesisConnection kinesisConnection;
// Kinesis Spout KinesisConfig object
- private transient final KinesisConfig kinesisConfig;
+ private final transient KinesisConfig kinesisConfig;
// Queue of records per shard fetched from kinesis and are waiting to be emitted
private transient Map<String, LinkedList<Record>> toEmitPerShard = new HashMap<>();
// Map of records that were fetched from kinesis as a result of failure and are waiting to be emitted
private transient Map<KinesisMessageId, Record> failedandFetchedRecords = new HashMap<>();
- // Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the
- // sequence number to commit. Logic explained in commit
+ /**
+ * Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail.
+ * At the same time order is needed to figure out the sequence number to commit. Logic explained in commit
+ */
private transient Map<String, TreeSet<BigInteger>> emittedPerShard = new HashMap<>();
// sorted acked sequence numbers - needed to figure out what sequence number can be committed
private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new HashMap<>();
@@ -66,13 +70,13 @@ class KinesisRecordsManager {
// boolean to track deactivated state
private transient boolean deactivated;
- KinesisRecordsManager (KinesisConfig kinesisConfig) {
+ KinesisRecordsManager(KinesisConfig kinesisConfig) {
this.kinesisConfig = kinesisConfig;
- this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
+ this.zkConnection = new ZkConnection(kinesisConfig.getZkInfo());
this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
}
- void initialize (int myTaskIndex, int totalTasks) {
+ void initialize(int myTaskIndex, int totalTasks) {
deactivated = false;
lastCommitTime = System.currentTimeMillis();
kinesisConnection.initialize();
@@ -90,7 +94,7 @@ class KinesisRecordsManager {
refreshShardIteratorsForNewRecords();
}
- void next (SpoutOutputCollector collector) {
+ void next(SpoutOutputCollector collector) {
if (shouldCommit()) {
commit();
}
@@ -98,7 +102,8 @@ class KinesisRecordsManager {
if (failedMessageId != null) {
// if the retry service returns a message that is not in failed set then ignore it. should never happen
BigInteger failedSequenceNumber = new BigInteger(failedMessageId.getSequenceNumber());
- if (failedPerShard.containsKey(failedMessageId.getShardId()) && failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber)) {
+ if (failedPerShard.containsKey(failedMessageId.getShardId())
+ && failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber)) {
if (!failedandFetchedRecords.containsKey(failedMessageId)) {
fetchFailedRecords(failedMessageId);
}
@@ -107,8 +112,8 @@ class KinesisRecordsManager {
kinesisConfig.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId);
return;
} else {
- LOG.warn("failedMessageEmitted not called on retrier for " + failedMessageId + ". This can happen a few times but should not happen " +
- "infinitely");
+ LOG.warn("failedMessageEmitted not called on retrier for " + failedMessageId
+ + ". This can happen a few times but should not happen infinitely");
}
} else {
LOG.warn("failedPerShard does not contain " + failedMessageId + ". This should never happen.");
@@ -132,29 +137,34 @@ class KinesisRecordsManager {
emitNewRecord(collector);
}
- void ack (KinesisMessageId kinesisMessageId) {
+ void ack(KinesisMessageId kinesisMessageId) {
// for an acked message add it to acked set and remove it from emitted and failed
String shardId = kinesisMessageId.getShardId();
BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
LOG.debug("Ack received for shardId: {} sequenceNumber: {}", shardId, sequenceNumber);
- // if an ack is received for a message then add it to the ackedPerShard TreeSet. TreeSet because while committing we need to figure out what is the
+ // if an ack is received for a message then add it to the ackedPerShard TreeSet. TreeSet because while
+ // committing we need to figure out what is the
// highest sequence number that can be committed for this shard
if (!ackedPerShard.containsKey(shardId)) {
ackedPerShard.put(shardId, new TreeSet<BigInteger>());
}
ackedPerShard.get(shardId).add(sequenceNumber);
- // if the acked message was in emittedPerShard that means we need to remove it from the emittedPerShard(which keeps track of in flight tuples)
+ // if the acked message was in emittedPerShard that means we need to remove it from the emittedPerShard (which
+ // keeps track of in flight tuples)
if (emittedPerShard.containsKey(shardId)) {
TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
emitted.remove(sequenceNumber);
}
- // an acked message should not be in failed since if it fails and gets re-emitted it moves to emittedPerShard from failedPerShard. Defensive coding.
+ // an acked message should not be in failed since if it fails and gets re-emitted it moves to emittedPerShard
+ // from failedPerShard. Defensive coding.
// Remove it from failedPerShard anyway
if (failedPerShard.containsKey(shardId)) {
failedPerShard.get(shardId).remove(sequenceNumber);
}
- // if an ack is for a message that failed once at least and was re-emitted then the record itself will be in failedAndFetchedRecords. We use that to
- // determine if the FailedMessageRetryHandler needs to be told about it and then remove the record itself to clean up memory
+ // if an ack is for a message that failed once at least and was re-emitted then the record itself will be in
+ // failedAndFetchedRecords. We use that to
+ // determine if the FailedMessageRetryHandler needs to be told about it and then remove the record itself to
+ // clean up memory
if (failedandFetchedRecords.containsKey(kinesisMessageId)) {
kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId);
failedandFetchedRecords.remove(kinesisMessageId);
@@ -165,7 +175,7 @@ class KinesisRecordsManager {
}
}
- void fail (KinesisMessageId kinesisMessageId) {
+ void fail(KinesisMessageId kinesisMessageId) {
String shardId = kinesisMessageId.getShardId();
BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
LOG.debug("Fail received for shardId: {} sequenceNumber: {}", shardId, sequenceNumber);
@@ -186,13 +196,18 @@ class KinesisRecordsManager {
}
}
- void commit () {
- // We have three mutually disjoint treesets per shard at any given time to keep track of what sequence number can be committed to zookeeper.
- // emittedPerShard, ackedPerShard and failedPerShard. Any record starts by entering emittedPerShard. On ack it moves from emittedPerShard to
- // ackedPerShard and on fail if retry service tells us to retry then it moves from emittedPerShard to failedPerShard. The failed records will move from
+ void commit() {
+ // We have three mutually disjoint treesets per shard at any given time to keep track of what sequence number
+ // can be committed to zookeeper.
+ // emittedPerShard, ackedPerShard and failedPerShard. Any record starts by entering emittedPerShard. On ack
+ // it moves from emittedPerShard to
+ // ackedPerShard and on fail if retry service tells us to retry then it moves from emittedPerShard to
+ // failedPerShard. The failed records will move from
// failedPerShard to emittedPerShard when the failed record is emitted again as a retry.
- // Logic for deciding what sequence number to commit is find the highest sequence number from ackedPerShard called X such that there is no sequence
- // number Y in emittedPerShard or failedPerShard that satisfies X > Y. For e.g. if ackedPerShard is 1,4,5, emittedPerShard is 2,6 and
+ // Logic for deciding what sequence number to commit is find the highest sequence number from ackedPerShard
+ // called X such that there is no sequence
+ // number Y in emittedPerShard or failedPerShard that satisfies X > Y. For e.g. if ackedPerShard is 1,4,5,
+ // emittedPerShard is 2,6 and
// failedPerShard is 3,7 then we can only commit 1 and not 4 because 2 is still pending and 3 has failed
for (String shardId: toEmitPerShard.keySet()) {
if (ackedPerShard.containsKey(shardId)) {
@@ -202,7 +217,8 @@ class KinesisRecordsManager {
}
if (emittedPerShard.containsKey(shardId) && !emittedPerShard.get(shardId).isEmpty()) {
BigInteger smallestEmittedSequenceNumber = emittedPerShard.get(shardId).first();
- if (commitSequenceNumberBound == null || (commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1)) {
+ if (commitSequenceNumberBound == null
+ || (commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1)) {
commitSequenceNumberBound = smallestEmittedSequenceNumber;
}
}
@@ -210,7 +226,8 @@ class KinesisRecordsManager {
BigInteger ackedSequenceNumberToCommit = null;
while (ackedSequenceNumbers.hasNext()) {
BigInteger ackedSequenceNumber = ackedSequenceNumbers.next();
- if (commitSequenceNumberBound == null || (commitSequenceNumberBound.compareTo(ackedSequenceNumber) == 1)) {
+ if (commitSequenceNumberBound == null
+ || (commitSequenceNumberBound.compareTo(ackedSequenceNumber) == 1)) {
ackedSequenceNumberToCommit = ackedSequenceNumber;
ackedSequenceNumbers.remove();
} else {
@@ -220,7 +237,9 @@ class KinesisRecordsManager {
if (ackedSequenceNumberToCommit != null) {
Map<Object, Object> state = new HashMap<>();
state.put("committedSequenceNumber", ackedSequenceNumberToCommit.toString());
- LOG.debug("Committing sequence number {} for shardId {}", ackedSequenceNumberToCommit.toString(), shardId);
+ LOG.debug("Committing sequence number {} for shardId {}",
+ ackedSequenceNumberToCommit.toString(),
+ shardId);
zkConnection.commitState(kinesisConfig.getStreamName(), shardId, state);
}
}
@@ -228,34 +247,38 @@ class KinesisRecordsManager {
lastCommitTime = System.currentTimeMillis();
}
- void activate () {
+ void activate() {
LOG.info("Activate called");
deactivated = false;
kinesisConnection.initialize();
}
- void deactivate () {
+ void deactivate() {
LOG.info("Deactivate called");
deactivated = true;
commit();
kinesisConnection.shutdown();
}
- void close () {
+ void close() {
commit();
kinesisConnection.shutdown();
zkConnection.shutdown();
}
- // fetch records from kinesis starting at sequence number for message passed as argument. Any other messages fetched and are in the failed queue will also
+ // fetch records from kinesis starting at sequence number for message passed as argument. Any other messages fetched
+ // and are in the failed queue will also
// be kept in memory to avoid going to kinesis again for retry
- private void fetchFailedRecords (KinesisMessageId kinesisMessageId) {
+ private void fetchFailedRecords(KinesisMessageId kinesisMessageId) {
// if shard iterator not present for this message, get it
if (!shardIteratorPerFailedMessage.containsKey(kinesisMessageId)) {
refreshShardIteratorForFailedRecord(kinesisMessageId);
}
String shardIterator = shardIteratorPerFailedMessage.get(kinesisMessageId);
- LOG.debug("Fetching failed records for shard id :{} at sequence number {} using shardIterator {}", kinesisMessageId.getShardId(), kinesisMessageId.getSequenceNumber(), shardIterator);
+ LOG.debug("Fetching failed records for shard id :{} at sequence number {} using shardIterator {}",
+ kinesisMessageId.getShardId(),
+ kinesisMessageId.getSequenceNumber(),
+ shardIterator);
try {
GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator);
if (getRecordsResult != null) {
@@ -269,7 +292,9 @@ class KinesisRecordsManager {
} else {
// add all fetched records to the set of failed records if they are present in failed set
for (Record record: records) {
- KinesisMessageId current = new KinesisMessageId(kinesisMessageId.getStreamName(), kinesisMessageId.getShardId(), record.getSequenceNumber());
+ KinesisMessageId current = new KinesisMessageId(kinesisMessageId.getStreamName(),
+ kinesisMessageId.getShardId(),
+ record.getSequenceNumber());
if (failedPerShard.get(kinesisMessageId.getShardId()).contains(new BigInteger(current.getSequenceNumber()))) {
failedandFetchedRecords.put(current, record);
shardIteratorPerFailedMessage.remove(current);
@@ -292,12 +317,15 @@ class KinesisRecordsManager {
}
}
- private void fetchNewRecords () {
+ private void fetchNewRecords() {
for (Map.Entry<String, LinkedList<Record>> entry : toEmitPerShard.entrySet()) {
String shardId = entry.getKey();
try {
String shardIterator = shardIteratorPerShard.get(shardId);
- LOG.debug("Fetching new records for shard id :{} using shardIterator {} after sequence number {}", shardId, shardIterator, fetchedSequenceNumberPerShard.get(shardId));
+ LOG.debug("Fetching new records for shard id :{} using shardIterator {} after sequence number {}",
+ shardId,
+ shardIterator,
+ fetchedSequenceNumberPerShard.get(shardId));
GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator);
if (getRecordsResult != null) {
List<Record> records = getRecordsResult.getRecords();
@@ -328,28 +356,30 @@ class KinesisRecordsManager {
}
}
- private void emitNewRecord (SpoutOutputCollector collector) {
+ private void emitNewRecord(SpoutOutputCollector collector) {
for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
String shardId = entry.getKey();
LinkedList<Record> listOfRecords = entry.getValue();
Record record;
while ((record = listOfRecords.pollFirst()) != null) {
- KinesisMessageId kinesisMessageId = new KinesisMessageId(kinesisConfig.getStreamName(), shardId, record.getSequenceNumber());
+ KinesisMessageId kinesisMessageId = new KinesisMessageId(kinesisConfig.getStreamName(),
+ shardId,
+ record.getSequenceNumber());
if (emitRecord(collector, record, kinesisMessageId)) {
- return;
+ return;
}
}
}
}
- private boolean emitFailedRecord (SpoutOutputCollector collector, KinesisMessageId kinesisMessageId) {
+ private boolean emitFailedRecord(SpoutOutputCollector collector, KinesisMessageId kinesisMessageId) {
if (!failedandFetchedRecords.containsKey(kinesisMessageId)) {
return false;
}
return emitRecord(collector, failedandFetchedRecords.get(kinesisMessageId), kinesisMessageId);
}
- private boolean emitRecord (SpoutOutputCollector collector, Record record, KinesisMessageId kinesisMessageId) {
+ private boolean emitRecord(SpoutOutputCollector collector, Record record, KinesisMessageId kinesisMessageId) {
boolean result = false;
List<Object> tuple = kinesisConfig.getRecordToTupleMapper().getTuple(record);
// if a record is returned put the sequence number in the emittedPerShard to tie back with ack or fail
@@ -368,11 +398,11 @@ class KinesisRecordsManager {
return result;
}
- private boolean shouldCommit () {
+ private boolean shouldCommit() {
return (System.currentTimeMillis() - lastCommitTime >= kinesisConfig.getZkInfo().getCommitIntervalMs());
}
- private void initializeFetchedSequenceNumbers () {
+ private void initializeFetchedSequenceNumbers() {
for (String shardId : toEmitPerShard.keySet()) {
Map<Object, Object> state = zkConnection.readState(kinesisConfig.getStreamName(), shardId);
// if state found for this shard in zk, then set the sequence number in fetchedSequenceNumber
@@ -386,38 +416,47 @@ class KinesisRecordsManager {
}
}
- private void refreshShardIteratorsForNewRecords () {
+ private void refreshShardIteratorsForNewRecords() {
for (String shardId: toEmitPerShard.keySet()) {
refreshShardIteratorForNewRecords(shardId);
}
}
- private void refreshShardIteratorForNewRecords (String shardId) {
+ private void refreshShardIteratorForNewRecords(String shardId) {
String shardIterator = null;
String lastFetchedSequenceNumber = fetchedSequenceNumberPerShard.get(shardId);
- ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null ? kinesisConfig.getShardIteratorType() : ShardIteratorType
- .AFTER_SEQUENCE_NUMBER);
+ ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null
+ ? kinesisConfig.getShardIteratorType()
+ : ShardIteratorType.AFTER_SEQUENCE_NUMBER);
// Set the shard iterator for last fetched sequence number to start from correct position in shard
- shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), shardId, shardIteratorType, lastFetchedSequenceNumber, kinesisConfig
- .getTimestamp());
+ shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(),
+ shardId,
+ shardIteratorType,
+ lastFetchedSequenceNumber,
+ kinesisConfig.getTimestamp());
if (shardIterator != null && !shardIterator.isEmpty()) {
- LOG.warn("Refreshing shard iterator for new records for shardId " + shardId + " with shardIterator " + shardIterator);
+ LOG.warn("Refreshing shard iterator for new records for shardId " + shardId
+ + " with shardIterator " + shardIterator);
shardIteratorPerShard.put(shardId, shardIterator);
}
}
- private void refreshShardIteratorForFailedRecord (KinesisMessageId kinesisMessageId) {
+ private void refreshShardIteratorForFailedRecord(KinesisMessageId kinesisMessageId) {
String shardIterator = null;
// Set the shard iterator for last fetched sequence number to start from correct position in shard
- shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), kinesisMessageId.getShardId(), ShardIteratorType
- .AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
+ shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(),
+ kinesisMessageId.getShardId(),
+ ShardIteratorType.AT_SEQUENCE_NUMBER,
+ kinesisMessageId.getSequenceNumber(),
+ null);
if (shardIterator != null && !shardIterator.isEmpty()) {
- LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId + " with shardIterator " + shardIterator);
+ LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId
+ + " with shardIterator " + shardIterator);
shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator);
}
}
- private Long getUncommittedRecordsCount () {
+ private Long getUncommittedRecordsCount() {
Long result = 0L;
for (Map.Entry<String, TreeSet<BigInteger>> emitted: emittedPerShard.entrySet()) {
result += emitted.getValue().size();
@@ -432,7 +471,7 @@ class KinesisRecordsManager {
return result;
}
- private boolean shouldFetchNewRecords () {
+ private boolean shouldFetchNewRecords() {
// check to see if any shard has already fetched records waiting to be emitted, in which case dont fetch more
boolean fetchRecords = true;
for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
index da08a50..cf9295a 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
@@ -18,20 +18,20 @@
package org.apache.storm.kinesis.spout;
+import java.util.Map;
+
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
-import java.util.Map;
-
public class KinesisSpout extends BaseRichSpout {
private final KinesisConfig kinesisConfig;
private transient KinesisRecordsManager kinesisRecordsManager;
private transient SpoutOutputCollector collector;
- public KinesisSpout (KinesisConfig kinesisConfig) {
+ public KinesisSpout(KinesisConfig kinesisConfig) {
this.kinesisConfig = kinesisConfig;
}
@@ -41,7 +41,7 @@ public class KinesisSpout extends BaseRichSpout {
}
@Override
- public Map<String, Object> getComponentConfiguration () {
+ public Map<String, Object> getComponentConfiguration() {
return super.getComponentConfiguration();
}
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
index c806539..bee9e80 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
@@ -15,24 +15,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kinesis.spout;
import com.amazonaws.services.kinesis.model.Record;
-import org.apache.storm.tuple.Fields;
import java.util.List;
+import org.apache.storm.tuple.Fields;
+
public interface RecordToTupleMapper {
/**
- *
+ * Retrieve the names of fields.
* @return names of fields in the emitted tuple
*/
- Fields getOutputFields ();
+ Fields getOutputFields();
/**
- *
+ * Retrieve the tuple.
* @param record kinesis record
* @return storm tuple to be emitted for this record, null if no tuple should be emitted
*/
- List<Object> getTuple (Record record);
+ List<Object> getTuple(Record record);
}
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java
similarity index 84%
rename from external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
rename to external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java
index 46a865b..d0a2dea 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java
@@ -18,31 +18,33 @@
package org.apache.storm.kinesis.spout;
+import java.nio.charset.Charset;
+import java.util.Map;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.json.simple.JSONValue;
-import java.nio.charset.Charset;
-import java.util.Map;
-
-class ZKConnection {
+class ZkConnection {
private final ZkInfo zkInfo;
private CuratorFramework curatorFramework;
- ZKConnection (ZkInfo zkInfo) {
+ ZkConnection(ZkInfo zkInfo) {
this.zkInfo = zkInfo;
}
- void initialize () {
- curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), zkInfo.getSessionTimeoutMs(), zkInfo.getConnectionTimeoutMs(), new
- RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
+ void initialize() {
+ curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(),
+ zkInfo.getSessionTimeoutMs(),
+ zkInfo.getConnectionTimeoutMs(),
+ new RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
curatorFramework.start();
}
- void commitState (String stream, String shardId, Map<Object, Object> state) {
+ void commitState(String stream, String shardId, Map<Object, Object> state) {
byte[] bytes = JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8"));
try {
String path = getZkPath(stream, shardId);
@@ -59,7 +61,7 @@ class ZKConnection {
}
}
- Map<Object, Object> readState (String stream, String shardId) {
+ Map<Object, Object> readState(String stream, String shardId) {
try {
String path = getZkPath(stream, shardId);
Map<Object, Object> state = null;
@@ -76,11 +78,11 @@ class ZKConnection {
}
}
- void shutdown () {
+ void shutdown() {
curatorFramework.close();
}
- private String getZkPath (String stream, String shardId) {
+ private String getZkPath(String stream, String shardId) {
String path = "";
if (!zkInfo.getZkNode().startsWith("/")) {
path += "/";
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
index a47f0ab..35e2890 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
@@ -36,8 +36,13 @@ public class ZkInfo implements Serializable {
// time to sleep between retries in milliseconds
private final Integer retryIntervalMs;
- public ZkInfo (String zkUrl, String zkNode, Integer sessionTimeoutMs, Integer connectionTimeoutMs, Long commitIntervalMs, Integer retryAttempts, Integer
- retryIntervalMs) {
+ public ZkInfo(String zkUrl,
+ String zkNode,
+ Integer sessionTimeoutMs,
+ Integer connectionTimeoutMs,
+ Long commitIntervalMs,
+ Integer retryAttempts,
+ Integer retryIntervalMs) {
this.zkUrl = zkUrl;
this.zkNode = zkNode;
this.sessionTimeoutMs = sessionTimeoutMs;
@@ -76,7 +81,7 @@ public class ZkInfo implements Serializable {
return retryIntervalMs;
}
- private void validate () {
+ private void validate() {
if (zkUrl == null || zkUrl.length() < 1) {
throw new IllegalArgumentException("zkUrl must be specified to connect to zookeeper");
@@ -91,12 +96,13 @@ public class ZkInfo implements Serializable {
checkPositive(retryIntervalMs, "retryIntervalMs");
}
- private void checkPositive (Integer argument, String name) {
+ private void checkPositive(Integer argument, String name) {
if (argument == null && argument <= 0) {
throw new IllegalArgumentException(name + " must be positive");
}
}
- private void checkPositive (Long argument, String name) {
+
+ private void checkPositive(Long argument, String name) {
if (argument == null && argument <= 0) {
throw new IllegalArgumentException(name + " must be positive");
}
@@ -104,15 +110,15 @@ public class ZkInfo implements Serializable {
@Override
public String toString() {
- return "ZkInfo{" +
- "zkUrl='" + zkUrl + '\'' +
- ", zkNode='" + zkNode + '\'' +
- ", sessionTimeoutMs=" + sessionTimeoutMs +
- ", connectionTimeoutMs=" + connectionTimeoutMs +
- ", commitIntervalMs=" + commitIntervalMs +
- ", retryAttempts=" + retryAttempts +
- ", retryIntervalMs=" + retryIntervalMs +
- '}';
+ return "ZkInfo{"
+ + "zkUrl='" + zkUrl + '\''
+ + ", zkNode='" + zkNode + '\''
+ + ", sessionTimeoutMs=" + sessionTimeoutMs
+ + ", connectionTimeoutMs=" + connectionTimeoutMs
+ + ", commitIntervalMs=" + commitIntervalMs
+ + ", retryAttempts=" + retryAttempts
+ + ", retryIntervalMs=" + retryIntervalMs
+ + '}';
}
}