You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/06/27 17:42:02 UTC

[storm] branch master updated: STORM-3421: Fix checkstyle violations in storm-kinesis

This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 34750d5  STORM-3421: Fix checkstyle violations in storm-kinesis
     new f617c6e  Merge pull request #3036 from krichter722/checkstyle-kinesis
34750d5 is described below

commit 34750d557b82cbdfdef379ab9b7162142a06b169
Author: Karl-Philipp Richter <kr...@posteo.de>
AuthorDate: Mon Jun 24 01:15:03 2019 +0200

    STORM-3421: Fix checkstyle violations in storm-kinesis
---
 external/storm-kinesis/pom.xml                     |   2 +-
 .../kinesis/spout/CredentialsProviderChain.java    |   5 +-
 .../kinesis/spout/ExponentialBackoffRetrier.java   |  33 +++--
 .../kinesis/spout/FailedMessageRetryHandler.java   |  22 +--
 .../apache/storm/kinesis/spout/KinesisConfig.java  |  59 ++++----
 .../storm/kinesis/spout/KinesisConnection.java     |  43 ++++--
 .../storm/kinesis/spout/KinesisConnectionInfo.java |  26 ++--
 .../storm/kinesis/spout/KinesisMessageId.java      |  40 +++---
 .../storm/kinesis/spout/KinesisRecordsManager.java | 157 +++++++++++++--------
 .../apache/storm/kinesis/spout/KinesisSpout.java   |   8 +-
 .../storm/kinesis/spout/RecordToTupleMapper.java   |  12 +-
 .../spout/{ZKConnection.java => ZkConnection.java} |  26 ++--
 .../org/apache/storm/kinesis/spout/ZkInfo.java     |  34 +++--
 13 files changed, 279 insertions(+), 188 deletions(-)

diff --git a/external/storm-kinesis/pom.xml b/external/storm-kinesis/pom.xml
index 6f04950..39c89f1 100644
--- a/external/storm-kinesis/pom.xml
+++ b/external/storm-kinesis/pom.xml
@@ -62,7 +62,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>185</maxAllowedViolations>
+                    <maxAllowedViolations>0</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
index 4287ae0..5820851 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/CredentialsProviderChain.java
@@ -26,10 +26,11 @@ import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
 
 /**
- * Class representing chain of mechanisms that will be used in order to connect to kinesis
+ * Class representing chain of mechanisms that will be used in order to connect to kinesis.
  */
 public class CredentialsProviderChain extends AWSCredentialsProviderChain {
-    public CredentialsProviderChain () {
+
+    public CredentialsProviderChain() {
         super(new EnvironmentVariableCredentialsProvider(),
                 new SystemPropertiesCredentialsProvider(),
                 new ClasspathPropertiesFileCredentialsProvider(),
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
index 88e8d70..2920913 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
@@ -18,9 +18,6 @@
 
 package org.apache.storm.kinesis.spout;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.Serializable;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -28,6 +25,9 @@ import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
     // Wait interfal for retrying after first failure
@@ -44,29 +44,29 @@ public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Ser
     private SortedSet<KinesisMessageId> retryMessageSet = new TreeSet<>(new RetryTimeComparator());
 
     /**
-     * no args constructor that uses defaults of 100 ms for first retry, max retries of Long.MAX_VALUE and an exponential backoff of Math.pow(2,i-1) secs for
-     * retry i where i = 2,3,
+     * No args constructor that uses defaults of 100 ms for first retry, max retries of Long.MAX_VALUE and an
+     * exponential backoff of {@code Math.pow(2,i-1)} secs for retry {@code i} where {@code i = 2,3,...}.
      */
-    public ExponentialBackoffRetrier () {
+    public ExponentialBackoffRetrier() {
         this(100L, 2L, Long.MAX_VALUE);
     }
 
     /**
-     *
+     * Creates a new exponential backoff retrier.
      * @param initialDelayMillis delay in milliseconds for first retry
      * @param baseSeconds base for exponent function in seconds
      * @param maxRetries maximum number of retries before the record is discarded/acked
      */
-    public ExponentialBackoffRetrier (Long initialDelayMillis, Long baseSeconds, Long maxRetries) {
+    public ExponentialBackoffRetrier(Long initialDelayMillis, Long baseSeconds, Long maxRetries) {
         this.initialDelayMillis = initialDelayMillis;
         this.baseSeconds = baseSeconds;
         this.maxRetries = maxRetries;
         validate();
     }
 
-    private void validate () {
+    private void validate() {
         if (initialDelayMillis < 0) {
-            throw new IllegalArgumentException("initialDelayMillis cannot be negative." );
+            throw new IllegalArgumentException("initialDelayMillis cannot be negative.");
         }
         if (baseSeconds < 0) {
             throw new IllegalArgumentException("baseSeconds cannot be negative.");
@@ -75,6 +75,7 @@ public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Ser
             throw new IllegalArgumentException("maxRetries cannot be negative.");
         }
     }
+
     @Override
     public boolean failed(KinesisMessageId messageId) {
         LOG.debug("Handling failed message {}", messageId);
@@ -114,7 +115,7 @@ public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Ser
     public KinesisMessageId getNextFailedMessageToRetry() {
         KinesisMessageId result = null;
         // return the first message to be retried from the set. It will return the message with the earliest retry time <= current time
-        if (!retryMessageSet.isEmpty() ) {
+        if (!retryMessageSet.isEmpty()) {
             result = retryMessageSet.first();
             if (!(retryTimes.get(result) <= System.nanoTime())) {
                 result = null;
@@ -126,15 +127,19 @@ public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Ser
 
     @Override
     public void failedMessageEmitted(KinesisMessageId messageId) {
-        // spout notified that message returned by us for retrying was actually emitted. hence remove it from set and wait for its ack or fail
+        // spout notified that message returned by us for retrying was actually emitted. hence remove it from set and
+        // wait for its ack or fail
         // but still keep it in counts map to retry again on failure or remove on ack
         LOG.debug("Spout says {} emitted. Hence removing it from queue and wait for its ack or fail", messageId);
         retryMessageSet.remove(messageId);
         retryTimes.remove(messageId);
     }
 
-    // private helper method to get next retry time for retry attempt i (handles long overflow as well by capping it to Long.MAX_VALUE)
-    private Long getRetryTime (Long retryNum) {
+    /**
+     * private helper method to get next retry time for retry attempt i (handles long overflow as well by capping it to
+     * Long.MAX_VALUE).
+     */
+    private Long getRetryTime(Long retryNum) {
         Long retryTime = System.nanoTime();
         Long nanoMultiplierForMillis = 1000000L;
         // if first retry then retry time  = current time  + initial delay
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
index bb0e450..c005c59 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/FailedMessageRetryHandler.java
@@ -22,27 +22,27 @@ import java.io.Serializable;
 
 public interface FailedMessageRetryHandler extends Serializable {
     /**
-     * message with messageId failed in the spout
-     * @param messageId
+     * message with messageId failed in the spout.
+     * @param messageId the message id
      * @return true if this failed message was scheduled to be retried, false otherwise
      */
-    boolean failed (KinesisMessageId messageId);
+    boolean failed(KinesisMessageId messageId);
 
     /**
-     * message with messageId succeeded/acked in the spout
-     * @param messageId
+     * message with messageId succeeded/acked in the spout.
+     * @param messageId the message id
      */
-    void acked (KinesisMessageId messageId);
+    void acked(KinesisMessageId messageId);
 
     /**
-     * Get the next failed message's id to retry if any, null otherwise
+     * Get the next failed message's id to retry if any, null otherwise.
      * @return messageId
      */
-    KinesisMessageId getNextFailedMessageToRetry ();
+    KinesisMessageId getNextFailedMessageToRetry();
 
     /**
-     * message with messageId returned by last call to getNextFailedMessageToRetry was emitted/retried by the spout
-     * @param messageId
+     * message with messageId returned by last call to getNextFailedMessageToRetry was emitted/retried by the spout.
+     * @param messageId the message id
      */
-    void failedMessageEmitted (KinesisMessageId messageId);
+    void failedMessageEmitted(KinesisMessageId messageId);
 }
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
index 744aef5..eeb5896 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConfig.java
@@ -38,13 +38,22 @@ public class KinesisConfig implements Serializable {
     private final ZkInfo zkInfo;
     // object representing information on paramaters to use while connecting to kinesis using kinesis client
     private final KinesisConnectionInfo kinesisConnectionInfo;
-    // this number represents the number of messages that are still not committed to zk. it will prevent the spout from emitting further.
-    // for e.g. if 1 failed and 2,3,4,5..... all have been acked by storm, they still cant be committed to zk because 1 is still in failed set. As a result
-    // the acked queue can infinitely grow without any of them being committed to zk. topology max pending does not help since from storm's view they are acked
+    /**
+     * This number represents the number of messages that are still not committed to zk. it will prevent the spout from
+     * emitting further. for e.g. if 1 failed and 2,3,4,5..... all have been acked by storm, they still can't be
+     * committed to zk because 1 is still in failed set. As a result the acked queue can infinitely grow without any of
+     * them being committed to zk. topology max pending does not help since from storm's view they are acked
+     */
     private final Long maxUncommittedRecords;
 
-    public KinesisConfig(String streamName, ShardIteratorType shardIteratorType, RecordToTupleMapper recordToTupleMapper, Date timestamp, FailedMessageRetryHandler
-            failedMessageRetryHandler, ZkInfo zkInfo, KinesisConnectionInfo kinesisConnectionInfo, Long maxUncommittedRecords) {
+    public KinesisConfig(String streamName,
+            ShardIteratorType shardIteratorType,
+            RecordToTupleMapper recordToTupleMapper,
+            Date timestamp,
+            FailedMessageRetryHandler failedMessageRetryHandler,
+            ZkInfo zkInfo,
+            KinesisConnectionInfo kinesisConnectionInfo,
+            Long maxUncommittedRecords) {
         this.streamName = streamName;
         this.shardIteratorType = shardIteratorType;
         this.recordToTupleMapper = recordToTupleMapper;
@@ -56,14 +65,16 @@ public class KinesisConfig implements Serializable {
         validate();
     }
 
-    private void validate () {
+    private void validate() {
         if (streamName == null || streamName.length() < 1) {
             throw new IllegalArgumentException("streamName is required and cannot be of length 0.");
         }
-        if (shardIteratorType == null || shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType
-                .AT_SEQUENCE_NUMBER)) {
-            throw new IllegalArgumentException("shardIteratorType has to be one of the " + ShardIteratorType.AT_TIMESTAMP + "," + ShardIteratorType.LATEST +
-                    "," + ShardIteratorType.TRIM_HORIZON);
+        if (shardIteratorType == null
+                || shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+                || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
+            throw new IllegalArgumentException("shardIteratorType has to be one of the " + ShardIteratorType.AT_TIMESTAMP
+                    + "," + ShardIteratorType.LATEST
+                    + "," + ShardIteratorType.TRIM_HORIZON);
         }
         if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP) && timestamp == null) {
             throw new IllegalArgumentException("timestamp must be provided if shardIteratorType is " + ShardIteratorType.AT_TIMESTAMP);
@@ -101,33 +112,33 @@ public class KinesisConfig implements Serializable {
         return timestamp;
     }
 
-    public FailedMessageRetryHandler getFailedMessageRetryHandler () {
+    public FailedMessageRetryHandler getFailedMessageRetryHandler() {
         return failedMessageRetryHandler;
     }
 
-    public ZkInfo getZkInfo () {
+    public ZkInfo getZkInfo() {
         return zkInfo;
     }
 
-    public KinesisConnectionInfo getKinesisConnectionInfo () {
+    public KinesisConnectionInfo getKinesisConnectionInfo() {
         return kinesisConnectionInfo;
     }
 
-    public Long getMaxUncommittedRecords () {
+    public Long getMaxUncommittedRecords() {
         return maxUncommittedRecords;
     }
 
     @Override
     public String toString() {
-        return "KinesisConfig{" +
-                "streamName='" + streamName + '\'' +
-                ", shardIteratorType=" + shardIteratorType +
-                ", recordToTupleMapper=" + recordToTupleMapper +
-                ", timestamp=" + timestamp +
-                ", zkInfo=" + zkInfo +
-                ", kinesisConnectionInfo=" + kinesisConnectionInfo +
-                ", failedMessageRetryHandler =" + failedMessageRetryHandler +
-                ", maxUncommittedRecords=" + maxUncommittedRecords +
-                '}';
+        return "KinesisConfig{"
+                + "streamName='" + streamName + '\''
+                + ", shardIteratorType=" + shardIteratorType
+                + ", recordToTupleMapper=" + recordToTupleMapper
+                + ", timestamp=" + timestamp
+                + ", zkInfo=" + zkInfo
+                + ", kinesisConnectionInfo=" + kinesisConnectionInfo
+                + ", failedMessageRetryHandler =" + failedMessageRetryHandler
+                + ", maxUncommittedRecords=" + maxUncommittedRecords
+                + '}';
     }
 }
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
index dfd9049..b8b969a 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnection.java
@@ -28,28 +28,30 @@ import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 class KinesisConnection {
     private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
     private final KinesisConnectionInfo kinesisConnectionInfo;
     private AmazonKinesisClient kinesisClient;
 
-    KinesisConnection (KinesisConnectionInfo kinesisConnectionInfo) {
+    KinesisConnection(KinesisConnectionInfo kinesisConnectionInfo) {
         this.kinesisConnectionInfo = kinesisConnectionInfo;
     }
 
-    void initialize () {
-        kinesisClient = new AmazonKinesisClient(kinesisConnectionInfo.getCredentialsProvider(), kinesisConnectionInfo.getClientConfiguration());
+    void initialize() {
+        kinesisClient = new AmazonKinesisClient(kinesisConnectionInfo.getCredentialsProvider(),
+                kinesisConnectionInfo.getClientConfiguration());
         kinesisClient.setRegion(Region.getRegion(kinesisConnectionInfo.getRegion()));
     }
 
-    List<Shard> getShardsForStream (String stream) {
+    List<Shard> getShardsForStream(String stream) {
         DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
         describeStreamRequest.setStreamName(stream);
         List<Shard> shards = new ArrayList<>();
@@ -63,19 +65,24 @@ class KinesisConnection {
             } else {
                 exclusiveStartShardId = null;
             }
-        } while ( exclusiveStartShardId != null );
+        } while (exclusiveStartShardId != null);
         LOG.info("Number of shards for stream " + stream + " are " + shards.size());
         return shards;
     }
 
-    String getShardIterator (String stream, String shardId, ShardIteratorType shardIteratorType, String sequenceNumber, Date timestamp) {
+    String getShardIterator(String stream,
+            String shardId,
+            ShardIteratorType shardIteratorType,
+            String sequenceNumber,
+            Date timestamp) {
         String shardIterator = "";
         try {
             GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
             getShardIteratorRequest.setStreamName(stream);
             getShardIteratorRequest.setShardId(shardId);
             getShardIteratorRequest.setShardIteratorType(shardIteratorType);
-            if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
+            if (shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+                    || shardIteratorType.equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) {
                 getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber);
             } else if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP)) {
                 getShardIteratorRequest.setTimestamp(timestamp);
@@ -85,15 +92,21 @@ class KinesisConnection {
                 shardIterator = getShardIteratorResult.getShardIterator();
             }
         } catch (Exception e) {
-            LOG.warn("Exception occured while getting shardIterator for shard " + shardId + " shardIteratorType " + shardIteratorType + " sequence number " +
-                    sequenceNumber + " timestamp " + timestamp, e);
+            LOG.warn("Exception occured while getting shardIterator for shard " + shardId
+                    + " shardIteratorType " + shardIteratorType
+                    + " sequence number " + sequenceNumber
+                    + " timestamp " + timestamp,
+                    e);
         }
-        LOG.warn("Returning shardIterator " + shardIterator + " for shardId " + shardId + " shardIteratorType " + shardIteratorType + " sequenceNumber " +
-                sequenceNumber + " timestamp" + timestamp);
+        LOG.warn("Returning shardIterator " + shardIterator
+                + " for shardId " + shardId
+                + " shardIteratorType " + shardIteratorType
+                + " sequenceNumber " + sequenceNumber
+                + " timestamp" + timestamp);
         return shardIterator;
     }
 
-    GetRecordsResult fetchRecords (String shardIterator) {
+    GetRecordsResult fetchRecords(String shardIterator) {
         GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
         getRecordsRequest.setShardIterator(shardIterator);
         getRecordsRequest.setLimit(kinesisConnectionInfo.getRecordsLimit());
@@ -101,7 +114,7 @@ class KinesisConnection {
         return getRecordsResult;
     }
 
-    void shutdown () {
+    void shutdown() {
         kinesisClient.shutdown();
     }
 
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
index 67ca29f..121e0bf 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
@@ -24,13 +24,14 @@ import com.amazonaws.regions.Regions;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-import org.objenesis.strategy.StdInstantiatorStrategy;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Serializable;
 import java.util.Arrays;
 
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
 public class KinesisConnectionInfo implements Serializable {
     private final byte[] serializedKinesisCredsProvider;
     private final byte[] serializedkinesisClientConfig;
@@ -41,13 +42,16 @@ public class KinesisConnectionInfo implements Serializable {
     private transient ClientConfiguration clientConfiguration;
 
     /**
-     *
+     * Create a new Kinesis connection info.
      * @param credentialsProvider implementation to provide credentials to connect to kinesis
      * @param clientConfiguration client configuration to pass to kinesis client
      * @param region region to connect to
      * @param recordsLimit max records to be fetched in a getRecords request to kinesis
      */
-    public KinesisConnectionInfo (AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, Regions region, Integer recordsLimit) {
+    public KinesisConnectionInfo(AWSCredentialsProvider credentialsProvider,
+            ClientConfiguration clientConfiguration,
+            Regions region,
+            Integer recordsLimit) {
         if (recordsLimit == null || recordsLimit <= 0) {
             throw new IllegalArgumentException("recordsLimit has to be a positive integer");
         }
@@ -82,7 +86,7 @@ public class KinesisConnectionInfo implements Serializable {
         return region;
     }
 
-    private byte[] getKryoSerializedBytes (final Object obj) {
+    private byte[] getKryoSerializedBytes(final Object obj) {
         final Kryo kryo = new Kryo();
         final ByteArrayOutputStream os = new ByteArrayOutputStream();
         final Output output = new Output(os);
@@ -92,7 +96,7 @@ public class KinesisConnectionInfo implements Serializable {
         return os.toByteArray();
     }
 
-    private Object getKryoDeserializedObject (final byte[] ser) {
+    private Object getKryoDeserializedObject(final byte[] ser) {
         final Kryo kryo = new Kryo();
         final Input input = new Input(new ByteArrayInputStream(ser));
         kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
@@ -101,11 +105,11 @@ public class KinesisConnectionInfo implements Serializable {
 
     @Override
     public String toString() {
-        return "KinesisConnectionInfo{" +
-                "serializedKinesisCredsProvider=" + Arrays.toString(serializedKinesisCredsProvider) +
-                ", serializedkinesisClientConfig=" + Arrays.toString(serializedkinesisClientConfig) +
-                ", region=" + region +
-                ", recordsLimit=" + recordsLimit +
-                '}';
+        return "KinesisConnectionInfo{"
+                + "serializedKinesisCredsProvider=" + Arrays.toString(serializedKinesisCredsProvider)
+                + ", serializedkinesisClientConfig=" + Arrays.toString(serializedkinesisClientConfig)
+                + ", region=" + region
+                + ", recordsLimit=" + recordsLimit
+                + '}';
     }
 }
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
index dd239f1..7fd179f 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisMessageId.java
@@ -23,48 +23,56 @@ public class KinesisMessageId {
     private final String shardId;
     private final String sequenceNumber;
 
-    KinesisMessageId (String streamName, String shardId, String sequenceNumber) {
+    KinesisMessageId(String streamName, String shardId, String sequenceNumber) {
         this.streamName = streamName;
         this.shardId = shardId;
         this.sequenceNumber = sequenceNumber;
     }
 
-    public String getStreamName () {
+    public String getStreamName() {
         return streamName;
     }
 
-    public String getShardId () {
+    public String getShardId() {
         return shardId;
     }
 
-    public String getSequenceNumber () {
+    public String getSequenceNumber() {
         return sequenceNumber;
     }
 
     @Override
-    public String toString () {
-        return "KinesisMessageId{" +
-                "streamName='" + streamName + '\'' +
-                ", shardId='" + shardId + '\'' +
-                ", sequenceNumber='" + sequenceNumber + '\'' +
-                '}';
+    public String toString() {
+        return "KinesisMessageId{"
+                + "streamName='" + streamName + '\''
+                + ", shardId='" + shardId + '\''
+                + ", sequenceNumber='" + sequenceNumber + '\''
+                + '}';
     }
 
     @Override
-    public boolean equals (Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
 
         KinesisMessageId that = (KinesisMessageId) o;
 
-        if (streamName != null ? !streamName.equals(that.streamName) : that.streamName != null) return false;
-        if (shardId != null ? !shardId.equals(that.shardId) : that.shardId != null) return false;
+        if (streamName != null ? !streamName.equals(that.streamName) : that.streamName != null) {
+            return false;
+        }
+        if (shardId != null ? !shardId.equals(that.shardId) : that.shardId != null) {
+            return false;
+        }
         return !(sequenceNumber != null ? !sequenceNumber.equals(that.sequenceNumber) : that.sequenceNumber != null);
 
     }
 
     @Override
-    public int hashCode () {
+    public int hashCode() {
         int result = streamName != null ? streamName.hashCode() : 0;
         result = 31 * result + (shardId != null ? shardId.hashCode() : 0);
         result = 31 * result + (sequenceNumber != null ? sequenceNumber.hashCode() : 0);
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
index 7831193..33dba4a 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
@@ -24,9 +24,6 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
 import java.util.HashMap;
@@ -36,20 +33,27 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
 
+import org.apache.storm.spout.SpoutOutputCollector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 class KinesisRecordsManager {
     private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
     // object handling zk interaction
-    private transient ZKConnection zkConnection;
+    private transient ZkConnection zkConnection;
     // object handling interaction with kinesis
     private transient KinesisConnection kinesisConnection;
     // Kinesis Spout KinesisConfig object
-    private transient final KinesisConfig kinesisConfig;
+    private final transient KinesisConfig kinesisConfig;
     // Queue of records per shard fetched from kinesis and are waiting to be emitted
     private transient Map<String, LinkedList<Record>> toEmitPerShard = new HashMap<>();
     // Map of records  that were fetched from kinesis as a result of failure and are waiting to be emitted
     private transient Map<KinesisMessageId, Record> failedandFetchedRecords = new HashMap<>();
-    // Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the
-    // sequence number to commit. Logic explained in commit
+    /**
+     * Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail.
+     * At the same time order is needed to figure out the sequence number to commit. Logic explained in commit
+     */
     private transient Map<String, TreeSet<BigInteger>> emittedPerShard = new HashMap<>();
     // sorted acked sequence numbers - needed to figure out what sequence number can be committed
     private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new HashMap<>();
@@ -66,13 +70,13 @@ class KinesisRecordsManager {
     // boolean to track deactivated state
     private transient boolean deactivated;
 
-    KinesisRecordsManager (KinesisConfig kinesisConfig) {
+    KinesisRecordsManager(KinesisConfig kinesisConfig) {
         this.kinesisConfig = kinesisConfig;
-        this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
+        this.zkConnection = new ZkConnection(kinesisConfig.getZkInfo());
         this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
     }
 
-    void initialize (int myTaskIndex, int totalTasks) {
+    void initialize(int myTaskIndex, int totalTasks) {
         deactivated = false;
         lastCommitTime = System.currentTimeMillis();
         kinesisConnection.initialize();
@@ -90,7 +94,7 @@ class KinesisRecordsManager {
         refreshShardIteratorsForNewRecords();
     }
 
-    void next (SpoutOutputCollector collector) {
+    void next(SpoutOutputCollector collector) {
         if (shouldCommit()) {
             commit();
         }
@@ -98,7 +102,8 @@ class KinesisRecordsManager {
         if (failedMessageId  != null) {
             // if the retry service returns a message that is not in failed set then ignore it. should never happen
             BigInteger failedSequenceNumber = new BigInteger(failedMessageId.getSequenceNumber());
-            if (failedPerShard.containsKey(failedMessageId.getShardId()) && failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber)) {
+            if (failedPerShard.containsKey(failedMessageId.getShardId())
+                    && failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber)) {
                 if (!failedandFetchedRecords.containsKey(failedMessageId)) {
                     fetchFailedRecords(failedMessageId);
                 }
@@ -107,8 +112,8 @@ class KinesisRecordsManager {
                     kinesisConfig.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId);
                     return;
                 } else {
-                    LOG.warn("failedMessageEmitted not called on retrier for " + failedMessageId + ". This can happen a few times but should not happen " +
-                            "infinitely");
+                    LOG.warn("failedMessageEmitted not called on retrier for " + failedMessageId
+                            + ". This can happen a few times but should not happen infinitely");
                 }
             } else {
                 LOG.warn("failedPerShard does not contain " + failedMessageId + ". This should never happen.");
@@ -132,29 +137,34 @@ class KinesisRecordsManager {
         emitNewRecord(collector);
     }
 
-    void ack (KinesisMessageId kinesisMessageId) {
+    void ack(KinesisMessageId kinesisMessageId) {
         // for an acked message add it to acked set and remove it from emitted and failed
         String shardId = kinesisMessageId.getShardId();
         BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
         LOG.debug("Ack received for shardId: {} sequenceNumber: {}", shardId, sequenceNumber);
-        // if an ack is received for a message then add it to the ackedPerShard TreeSet. TreeSet because while committing we need to figure out what is the
+        // if an ack is received for a message then add it to the ackedPerShard TreeSet. TreeSet because while
+        // committing we need to figure out what is the
         // highest sequence number that can be committed for this shard
         if (!ackedPerShard.containsKey(shardId)) {
             ackedPerShard.put(shardId, new TreeSet<BigInteger>());
         }
         ackedPerShard.get(shardId).add(sequenceNumber);
-        // if the acked message was in emittedPerShard that means we need to remove it from the emittedPerShard(which keeps track of in flight tuples)
+        // if the acked message was in emittedPerShard that means we need to remove it from the emittedPerShard (which
+        // keeps track of in flight tuples)
         if (emittedPerShard.containsKey(shardId)) {
             TreeSet<BigInteger> emitted = emittedPerShard.get(shardId);
             emitted.remove(sequenceNumber);
         }
-        // an acked message should not be in failed since if it fails and gets re-emitted it moves to emittedPerShard from failedPerShard. Defensive coding.
+        // an acked message should not be in failed since if it fails and gets re-emitted it moves to emittedPerShard
+        // from failedPerShard. Defensive coding.
         // Remove it from failedPerShard anyway
         if (failedPerShard.containsKey(shardId)) {
             failedPerShard.get(shardId).remove(sequenceNumber);
         }
-        // if an ack is for a message that failed once at least and was re-emitted then the record itself will be in failedAndFetchedRecords. We use that to
-        // determine if the FailedMessageRetryHandler needs to be told about it and then remove the record itself to clean up memory
+        // if an ack is for a message that failed once at least and was re-emitted then the record itself will be in
+        // failedAndFetchedRecords. We use that to
+        // determine if the FailedMessageRetryHandler needs to be told about it and then remove the record itself to
+        // clean up memory
         if (failedandFetchedRecords.containsKey(kinesisMessageId)) {
             kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId);
             failedandFetchedRecords.remove(kinesisMessageId);
@@ -165,7 +175,7 @@ class KinesisRecordsManager {
         }
     }
 
-    void fail (KinesisMessageId kinesisMessageId) {
+    void fail(KinesisMessageId kinesisMessageId) {
         String shardId = kinesisMessageId.getShardId();
         BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
         LOG.debug("Fail received for shardId: {} sequenceNumber: {}", shardId, sequenceNumber);
@@ -186,13 +196,18 @@ class KinesisRecordsManager {
         }
     }
 
-    void commit () {
-        // We have three mutually disjoint treesets per shard at any given time to keep track of what sequence number can be committed to zookeeper.
-        // emittedPerShard, ackedPerShard and failedPerShard. Any record starts by entering emittedPerShard. On ack it moves from emittedPerShard to
-        // ackedPerShard and on fail if retry service tells us to retry then it moves from emittedPerShard to failedPerShard. The failed records will move from
+    void commit() {
+        // We have three mutually disjoint treesets per shard at any given time to keep track of what sequence number
+        // can be committed to zookeeper.
+        // emittedPerShard, ackedPerShard and failedPerShard. Any record starts by entering emittedPerShard. On ack
+        // it moves from emittedPerShard to
+        // ackedPerShard and on fail if retry service tells us to retry then it moves from emittedPerShard to
+        // failedPerShard. The failed records will move from
         // failedPerShard to emittedPerShard when the failed record is emitted again as a retry.
-        // Logic for deciding what sequence number to commit is find the highest sequence number from ackedPerShard called X such that there is no sequence
-        // number Y in emittedPerShard or failedPerShard that satisfies X > Y. For e.g. if ackedPerShard is 1,4,5, emittedPerShard is 2,6 and
+        // Logic for deciding what sequence number to commit is find the highest sequence number from ackedPerShard
+        // called X such that there is no sequence
+        // number Y in emittedPerShard or failedPerShard that satisfies X > Y. For e.g. if ackedPerShard is 1,4,5,
+        // emittedPerShard is 2,6 and
         // failedPerShard is 3,7 then we can only commit 1 and not 4 because 2 is still pending and 3 has failed
         for (String shardId: toEmitPerShard.keySet()) {
             if (ackedPerShard.containsKey(shardId)) {
@@ -202,7 +217,8 @@ class KinesisRecordsManager {
                 }
                 if (emittedPerShard.containsKey(shardId) && !emittedPerShard.get(shardId).isEmpty()) {
                     BigInteger smallestEmittedSequenceNumber = emittedPerShard.get(shardId).first();
-                    if (commitSequenceNumberBound == null || (commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1)) {
+                    if (commitSequenceNumberBound == null
+                            || (commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1)) {
                         commitSequenceNumberBound = smallestEmittedSequenceNumber;
                     }
                 }
@@ -210,7 +226,8 @@ class KinesisRecordsManager {
                 BigInteger ackedSequenceNumberToCommit = null;
                 while (ackedSequenceNumbers.hasNext()) {
                     BigInteger ackedSequenceNumber = ackedSequenceNumbers.next();
-                    if (commitSequenceNumberBound == null || (commitSequenceNumberBound.compareTo(ackedSequenceNumber) == 1)) {
+                    if (commitSequenceNumberBound == null
+                            || (commitSequenceNumberBound.compareTo(ackedSequenceNumber) == 1)) {
                         ackedSequenceNumberToCommit = ackedSequenceNumber;
                         ackedSequenceNumbers.remove();
                     } else {
@@ -220,7 +237,9 @@ class KinesisRecordsManager {
                 if (ackedSequenceNumberToCommit != null) {
                     Map<Object, Object> state = new HashMap<>();
                     state.put("committedSequenceNumber", ackedSequenceNumberToCommit.toString());
-                    LOG.debug("Committing sequence number {} for shardId {}", ackedSequenceNumberToCommit.toString(), shardId);
+                    LOG.debug("Committing sequence number {} for shardId {}",
+                            ackedSequenceNumberToCommit.toString(),
+                            shardId);
                     zkConnection.commitState(kinesisConfig.getStreamName(), shardId, state);
                 }
             }
@@ -228,34 +247,38 @@ class KinesisRecordsManager {
         lastCommitTime = System.currentTimeMillis();
     }
 
-    void activate () {
+    void activate() {
         LOG.info("Activate called");
         deactivated = false;
         kinesisConnection.initialize();
     }
 
-    void deactivate () {
+    void deactivate() {
         LOG.info("Deactivate called");
         deactivated = true;
         commit();
         kinesisConnection.shutdown();
     }
 
-    void close () {
+    void close() {
         commit();
         kinesisConnection.shutdown();
         zkConnection.shutdown();
     }
 
-    // fetch records from kinesis starting at sequence number for message passed as argument. Any other messages fetched and are in the failed queue will also
+    // fetch records from kinesis starting at sequence number for message passed as argument. Any other messages fetched
+    // and are in the failed queue will also
     // be kept in memory to avoid going to kinesis again for retry
-    private void fetchFailedRecords (KinesisMessageId kinesisMessageId) {
+    private void fetchFailedRecords(KinesisMessageId kinesisMessageId) {
         // if shard iterator not present for this message, get it
         if (!shardIteratorPerFailedMessage.containsKey(kinesisMessageId)) {
             refreshShardIteratorForFailedRecord(kinesisMessageId);
         }
         String shardIterator = shardIteratorPerFailedMessage.get(kinesisMessageId);
-        LOG.debug("Fetching failed records for shard id :{} at sequence number {} using shardIterator {}", kinesisMessageId.getShardId(), kinesisMessageId.getSequenceNumber(), shardIterator);
+        LOG.debug("Fetching failed records for shard id :{} at sequence number {} using shardIterator {}",
+                kinesisMessageId.getShardId(),
+                kinesisMessageId.getSequenceNumber(),
+                shardIterator);
         try {
             GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator);
             if (getRecordsResult != null) {
@@ -269,7 +292,9 @@ class KinesisRecordsManager {
                 } else {
                     // add all fetched records to the set of failed records if they are present in failed set
                     for (Record record: records) {
-                        KinesisMessageId current = new KinesisMessageId(kinesisMessageId.getStreamName(), kinesisMessageId.getShardId(), record.getSequenceNumber());
+                        KinesisMessageId current = new KinesisMessageId(kinesisMessageId.getStreamName(),
+                                kinesisMessageId.getShardId(),
+                                record.getSequenceNumber());
                         if (failedPerShard.get(kinesisMessageId.getShardId()).contains(new BigInteger(current.getSequenceNumber()))) {
                             failedandFetchedRecords.put(current, record);
                             shardIteratorPerFailedMessage.remove(current);
@@ -292,12 +317,15 @@ class KinesisRecordsManager {
         }
     }
 
-    private void fetchNewRecords () {
+    private void fetchNewRecords() {
         for (Map.Entry<String, LinkedList<Record>> entry : toEmitPerShard.entrySet()) {
             String shardId = entry.getKey();
             try {
                 String shardIterator = shardIteratorPerShard.get(shardId);
-                LOG.debug("Fetching new records for shard id :{} using shardIterator {} after sequence number {}", shardId, shardIterator, fetchedSequenceNumberPerShard.get(shardId));
+                LOG.debug("Fetching new records for shard id :{} using shardIterator {} after sequence number {}",
+                        shardId,
+                        shardIterator,
+                        fetchedSequenceNumberPerShard.get(shardId));
                 GetRecordsResult getRecordsResult = kinesisConnection.fetchRecords(shardIterator);
                 if (getRecordsResult != null) {
                     List<Record> records = getRecordsResult.getRecords();
@@ -328,28 +356,30 @@ class KinesisRecordsManager {
         }
     }
 
-    private void emitNewRecord (SpoutOutputCollector collector) {
+    private void emitNewRecord(SpoutOutputCollector collector) {
         for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
             String shardId = entry.getKey();
             LinkedList<Record> listOfRecords = entry.getValue();
             Record record;
             while ((record = listOfRecords.pollFirst()) != null) {
-                KinesisMessageId kinesisMessageId = new KinesisMessageId(kinesisConfig.getStreamName(), shardId, record.getSequenceNumber());
+                KinesisMessageId kinesisMessageId = new KinesisMessageId(kinesisConfig.getStreamName(),
+                        shardId,
+                        record.getSequenceNumber());
                 if (emitRecord(collector, record, kinesisMessageId)) {
-                   return;
+                    return;
                 }
             }
         }
     }
 
-    private boolean emitFailedRecord (SpoutOutputCollector collector, KinesisMessageId kinesisMessageId) {
+    private boolean emitFailedRecord(SpoutOutputCollector collector, KinesisMessageId kinesisMessageId) {
         if (!failedandFetchedRecords.containsKey(kinesisMessageId)) {
             return false;
         }
         return emitRecord(collector, failedandFetchedRecords.get(kinesisMessageId), kinesisMessageId);
     }
 
-    private boolean emitRecord (SpoutOutputCollector collector, Record record, KinesisMessageId kinesisMessageId) {
+    private boolean emitRecord(SpoutOutputCollector collector, Record record, KinesisMessageId kinesisMessageId) {
         boolean result = false;
         List<Object> tuple = kinesisConfig.getRecordToTupleMapper().getTuple(record);
         // if a record is returned put the sequence number in the emittedPerShard to tie back with ack or fail
@@ -368,11 +398,11 @@ class KinesisRecordsManager {
         return result;
     }
 
-    private boolean shouldCommit () {
+    private boolean shouldCommit() {
         return (System.currentTimeMillis() - lastCommitTime >= kinesisConfig.getZkInfo().getCommitIntervalMs());
     }
 
-    private void initializeFetchedSequenceNumbers () {
+    private void initializeFetchedSequenceNumbers() {
         for (String shardId : toEmitPerShard.keySet()) {
             Map<Object, Object> state = zkConnection.readState(kinesisConfig.getStreamName(), shardId);
             // if state found for this shard in zk, then set the sequence number in fetchedSequenceNumber
@@ -386,38 +416,47 @@ class KinesisRecordsManager {
         }
     }
 
-    private void refreshShardIteratorsForNewRecords () {
+    private void refreshShardIteratorsForNewRecords() {
         for (String shardId: toEmitPerShard.keySet()) {
             refreshShardIteratorForNewRecords(shardId);
         }
     }
 
-    private void refreshShardIteratorForNewRecords (String shardId) {
+    private void refreshShardIteratorForNewRecords(String shardId) {
         String shardIterator = null;
         String lastFetchedSequenceNumber = fetchedSequenceNumberPerShard.get(shardId);
-        ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null ? kinesisConfig.getShardIteratorType() : ShardIteratorType
-                .AFTER_SEQUENCE_NUMBER);
+        ShardIteratorType shardIteratorType = (lastFetchedSequenceNumber == null
+                ? kinesisConfig.getShardIteratorType()
+                : ShardIteratorType.AFTER_SEQUENCE_NUMBER);
         // Set the shard iterator for last fetched sequence number to start from correct position in shard
-        shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), shardId, shardIteratorType, lastFetchedSequenceNumber, kinesisConfig
-                .getTimestamp());
+        shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(),
+                shardId,
+                shardIteratorType,
+                lastFetchedSequenceNumber,
+                kinesisConfig.getTimestamp());
         if (shardIterator != null && !shardIterator.isEmpty()) {
-            LOG.warn("Refreshing shard iterator for new records for shardId " + shardId + " with shardIterator " + shardIterator);
+            LOG.warn("Refreshing shard iterator for new records for shardId " + shardId
+                    + " with shardIterator " + shardIterator);
             shardIteratorPerShard.put(shardId, shardIterator);
         }
     }
 
-    private void refreshShardIteratorForFailedRecord (KinesisMessageId kinesisMessageId) {
+    private void refreshShardIteratorForFailedRecord(KinesisMessageId kinesisMessageId) {
         String shardIterator = null;
         // Set the shard iterator for last fetched sequence number to start from correct position in shard
-        shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(), kinesisMessageId.getShardId(), ShardIteratorType
-                .AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
+        shardIterator = kinesisConnection.getShardIterator(kinesisConfig.getStreamName(),
+                kinesisMessageId.getShardId(),
+                ShardIteratorType.AT_SEQUENCE_NUMBER,
+                kinesisMessageId.getSequenceNumber(),
+                null);
         if (shardIterator != null && !shardIterator.isEmpty()) {
-            LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId + " with shardIterator " + shardIterator);
+            LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId
+                    + " with shardIterator " + shardIterator);
             shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator);
         }
     }
 
-    private Long getUncommittedRecordsCount () {
+    private Long getUncommittedRecordsCount() {
         Long result = 0L;
         for (Map.Entry<String, TreeSet<BigInteger>> emitted: emittedPerShard.entrySet()) {
             result += emitted.getValue().size();
@@ -432,7 +471,7 @@ class KinesisRecordsManager {
         return result;
     }
 
-    private boolean shouldFetchNewRecords () {
+    private boolean shouldFetchNewRecords() {
         // check to see if any shard has already fetched records waiting to be emitted, in which case dont fetch more
         boolean fetchRecords = true;
         for (Map.Entry<String, LinkedList<Record>> entry: toEmitPerShard.entrySet()) {
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
index da08a50..cf9295a 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisSpout.java
@@ -18,20 +18,20 @@
 
 package org.apache.storm.kinesis.spout;
 
+import java.util.Map;
+
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 
-import java.util.Map;
-
 public class KinesisSpout extends BaseRichSpout {
 
     private final KinesisConfig kinesisConfig;
     private transient KinesisRecordsManager kinesisRecordsManager;
     private transient SpoutOutputCollector collector;
 
-    public KinesisSpout (KinesisConfig kinesisConfig) {
+    public KinesisSpout(KinesisConfig kinesisConfig) {
         this.kinesisConfig = kinesisConfig;
     }
 
@@ -41,7 +41,7 @@ public class KinesisSpout extends BaseRichSpout {
     }
 
     @Override
-    public Map<String, Object> getComponentConfiguration () {
+    public Map<String, Object> getComponentConfiguration() {
         return super.getComponentConfiguration();
     }
 
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
index c806539..bee9e80 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/RecordToTupleMapper.java
@@ -15,24 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.kinesis.spout;
 
 import com.amazonaws.services.kinesis.model.Record;
-import org.apache.storm.tuple.Fields;
 
 import java.util.List;
 
+import org.apache.storm.tuple.Fields;
+
 public interface RecordToTupleMapper {
     /**
-     *
+     * Retrieve the names of fields.
      * @return names of fields in the emitted tuple
      */
-    Fields getOutputFields ();
+    Fields getOutputFields();
 
     /**
-     *
+     * Retrieve the tuple.
      * @param record kinesis record
      * @return storm tuple to be emitted for this record, null if no tuple should be emitted
      */
-    List<Object> getTuple (Record record);
+    List<Object> getTuple(Record record);
 }
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java
similarity index 84%
rename from external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
rename to external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java
index 46a865b..d0a2dea 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZKConnection.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkConnection.java
@@ -18,31 +18,33 @@
 
 package org.apache.storm.kinesis.spout;
 
+import java.nio.charset.Charset;
+import java.util.Map;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.zookeeper.CreateMode;
 import org.json.simple.JSONValue;
 
-import java.nio.charset.Charset;
-import java.util.Map;
-
-class ZKConnection {
+class ZkConnection {
 
     private final ZkInfo zkInfo;
     private CuratorFramework curatorFramework;
 
-    ZKConnection (ZkInfo zkInfo) {
+    ZkConnection(ZkInfo zkInfo) {
         this.zkInfo = zkInfo;
     }
 
-    void initialize () {
-        curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(), zkInfo.getSessionTimeoutMs(), zkInfo.getConnectionTimeoutMs(), new
-                RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
+    void initialize() {
+        curatorFramework = CuratorFrameworkFactory.newClient(zkInfo.getZkUrl(),
+                zkInfo.getSessionTimeoutMs(),
+                zkInfo.getConnectionTimeoutMs(),
+                new RetryNTimes(zkInfo.getRetryAttempts(), zkInfo.getRetryIntervalMs()));
         curatorFramework.start();
     }
 
-    void commitState (String stream, String shardId, Map<Object, Object> state) {
+    void commitState(String stream, String shardId, Map<Object, Object> state) {
         byte[] bytes = JSONValue.toJSONString(state).getBytes(Charset.forName("UTF-8"));
         try {
             String path = getZkPath(stream, shardId);
@@ -59,7 +61,7 @@ class ZKConnection {
         }
     }
 
-    Map<Object, Object> readState (String stream, String shardId) {
+    Map<Object, Object> readState(String stream, String shardId) {
         try {
             String path = getZkPath(stream, shardId);
             Map<Object, Object> state = null;
@@ -76,11 +78,11 @@ class ZKConnection {
         }
     }
 
-    void shutdown () {
+    void shutdown() {
         curatorFramework.close();
     }
 
-    private String getZkPath (String stream, String shardId) {
+    private String getZkPath(String stream, String shardId) {
         String path = "";
         if (!zkInfo.getZkNode().startsWith("/")) {
             path += "/";
diff --git a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
index a47f0ab..35e2890 100644
--- a/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
+++ b/external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java
@@ -36,8 +36,13 @@ public class ZkInfo implements Serializable {
     // time to sleep between retries in milliseconds
     private final Integer retryIntervalMs;
 
-    public ZkInfo (String zkUrl, String zkNode, Integer sessionTimeoutMs, Integer connectionTimeoutMs, Long commitIntervalMs, Integer retryAttempts, Integer
-            retryIntervalMs) {
+    public ZkInfo(String zkUrl,
+            String zkNode,
+            Integer sessionTimeoutMs,
+            Integer connectionTimeoutMs,
+            Long commitIntervalMs,
+            Integer retryAttempts,
+            Integer retryIntervalMs) {
         this.zkUrl = zkUrl;
         this.zkNode = zkNode;
         this.sessionTimeoutMs = sessionTimeoutMs;
@@ -76,7 +81,7 @@ public class ZkInfo implements Serializable {
         return retryIntervalMs;
     }
 
-    private void validate () {
+    private void validate() {
 
         if (zkUrl == null || zkUrl.length() < 1) {
             throw new IllegalArgumentException("zkUrl must be specified to connect to zookeeper");
@@ -91,12 +96,13 @@ public class ZkInfo implements Serializable {
         checkPositive(retryIntervalMs, "retryIntervalMs");
     }
 
-    private void checkPositive (Integer argument, String name) {
+    private void checkPositive(Integer argument, String name) {
         if (argument == null && argument <= 0) {
             throw new IllegalArgumentException(name + " must be positive");
         }
     }
-    private void checkPositive (Long argument, String name) {
+
+    private void checkPositive(Long argument, String name) {
         if (argument == null && argument <= 0) {
             throw new IllegalArgumentException(name + " must be positive");
         }
@@ -104,15 +110,15 @@ public class ZkInfo implements Serializable {
 
     @Override
     public String toString() {
-        return "ZkInfo{" +
-                "zkUrl='" + zkUrl + '\'' +
-                ", zkNode='" + zkNode + '\'' +
-                ", sessionTimeoutMs=" + sessionTimeoutMs +
-                ", connectionTimeoutMs=" + connectionTimeoutMs +
-                ", commitIntervalMs=" + commitIntervalMs +
-                ", retryAttempts=" + retryAttempts +
-                ", retryIntervalMs=" + retryIntervalMs +
-                '}';
+        return "ZkInfo{"
+                + "zkUrl='" + zkUrl + '\''
+                + ", zkNode='" + zkNode + '\''
+                + ", sessionTimeoutMs=" + sessionTimeoutMs
+                + ", connectionTimeoutMs=" + connectionTimeoutMs
+                + ", commitIntervalMs=" + commitIntervalMs
+                + ", retryAttempts=" + retryAttempts
+                + ", retryIntervalMs=" + retryIntervalMs
+                + '}';
     }
 
 }