You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/05/21 10:41:51 UTC

[2/5] storm git commit: STORM-2515: Fix most checkstyle violations in storm-kafka-client

STORM-2515: Fix most checkstyle violations in storm-kafka-client


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4d8f5d6e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4d8f5d6e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4d8f5d6e

Branch: refs/heads/master
Commit: 4d8f5d6edfd9c9a405cb4c6ea84312752e21fd6e
Parents: 2a0c168
Author: Stig Rohde Døssing <st...@gmail.com>
Authored: Mon May 15 21:47:04 2017 +0200
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 19:30:44 2017 +0900

----------------------------------------------------------------------
 external/storm-kafka-client/pom.xml             |   2 +-
 .../org/apache/storm/kafka/bolt/KafkaBolt.java  |  43 +++---
 .../FieldNameBasedTupleToKafkaMapper.java       |   1 +
 .../kafka/bolt/mapper/TupleToKafkaMapper.java   |   6 +-
 .../bolt/selector/DefaultTopicSelector.java     |   1 +
 .../bolt/selector/FieldIndexTopicSelector.java  |   6 +
 .../bolt/selector/FieldNameTopicSelector.java   |   1 +
 .../kafka/bolt/selector/KafkaTopicSelector.java |   4 +-
 .../kafka/spout/ByTopicRecordTranslator.java    |  23 +--
 .../kafka/spout/DefaultRecordTranslator.java    |   3 +-
 .../java/org/apache/storm/kafka/spout/Func.java |   3 +-
 .../apache/storm/kafka/spout/KafkaSpout.java    |  73 ++++++----
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 143 +++++++++++--------
 .../storm/kafka/spout/KafkaSpoutMessageId.java  |  40 ++++--
 .../KafkaSpoutRetryExponentialBackoff.java      |  56 ++++----
 .../kafka/spout/KafkaSpoutRetryService.java     |  12 +-
 .../apache/storm/kafka/spout/KafkaTuple.java    |   8 +-
 .../spout/ManualPartitionNamedSubscription.java |   2 +-
 .../ManualPartitionPatternSubscription.java     |   4 +-
 .../storm/kafka/spout/ManualPartitioner.java    |   2 +-
 .../storm/kafka/spout/NamedSubscription.java    |   4 +-
 .../storm/kafka/spout/PatternSubscription.java  |   4 +-
 .../storm/kafka/spout/RecordTranslator.java     |  16 +--
 .../spout/RoundRobinManualPartitioner.java      |   6 +-
 .../kafka/spout/SerializableDeserializer.java   |   1 +
 .../kafka/spout/SimpleRecordTranslator.java     |   8 +-
 .../apache/storm/kafka/spout/Subscription.java  |   2 +-
 .../kafka/spout/TopicPartitionComparator.java   |   6 +-
 .../spout/internal/KafkaConsumerFactory.java    |   3 +-
 .../internal/KafkaConsumerFactoryDefault.java   |   1 +
 .../kafka/spout/internal/OffsetManager.java     |  25 +++-
 .../storm/kafka/spout/internal/Timer.java       |   3 +-
 .../trident/KafkaTridentSpoutBatchMetadata.java |  29 ++--
 .../spout/trident/KafkaTridentSpoutEmitter.java |  56 ++++----
 .../spout/trident/KafkaTridentSpoutManager.java |  15 +-
 .../spout/trident/KafkaTridentSpoutOpaque.java  |  12 +-
 .../KafkaTridentSpoutOpaqueCoordinator.java     |  16 +--
 .../KafkaTridentSpoutTopicPartition.java        |  13 +-
 ...KafkaTridentSpoutTopicPartitionRegistry.java |   3 +-
 .../trident/KafkaTridentSpoutTransactional.java |   3 +-
 .../storm/kafka/trident/TridentKafkaState.java  |  78 +++++-----
 .../kafka/trident/TridentKafkaStateFactory.java |  12 +-
 .../FieldNameBasedTupleToKafkaMapper.java       |   1 +
 .../mapper/TridentTupleToKafkaMapper.java       |   7 +-
 .../trident/selector/DefaultTopicSelector.java  |   1 +
 .../trident/selector/KafkaTopicSelector.java    |   4 +-
 .../java/org/apache/storm/kafka/KafkaUnit.java  |  21 ++-
 .../org/apache/storm/kafka/KafkaUnitRule.java   |   3 +-
 .../apache/storm/kafka/bolt/KafkaBoltTest.java  |   1 -
 .../spout/ByTopicRecordTranslatorTest.java      |   1 -
 .../spout/DefaultRecordTranslatorTest.java      |   1 -
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |   9 +-
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   |  16 +--
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |   1 -
 .../kafka/spout/MaxUncommittedOffsetTest.java   |   1 -
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |  31 ++--
 .../kafka/spout/test/KafkaSpoutTestBolt.java    |   3 +-
 .../test/KafkaSpoutTopologyMainNamedTopics.java |   1 -
 .../KafkaSpoutTopologyMainWildcardTopics.java   |   1 -
 59 files changed, 470 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index a275428..843868f 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -152,7 +152,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>342</maxAllowedViolations>
+                    <maxAllowedViolations>9</maxAllowedViolations>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
index 1a401b2..2ff1a54 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
@@ -15,29 +15,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.kafka.bolt;
 
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.clients.producer.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
 import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
 import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
 import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
-import java.util.concurrent.Future;
-import java.util.concurrent.ExecutionException;
-import java.util.Map;
-import java.util.Properties;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -81,7 +80,7 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
     }
 
     /**
-     * Set the messages to be published to a single topic
+     * Set the messages to be published to a single topic.
      * @param topic the topic to publish to
      * @return this
      */
@@ -139,7 +138,7 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
             key = mapper.getKeyFromTuple(input);
             message = mapper.getMessageFromTuple(input);
             topic = topicSelector.getTopic(input);
-            if (topic != null ) {
+            if (topic != null) {
                 Callback callback = null;
 
                 if (!fireAndForget && async) {
@@ -194,7 +193,7 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
      * the tuple as soon as it has handed the message off to the producer API
      * if false (the default) the message will be acked after it was successfully sent to kafka or
      * failed if it was not successfully sent.
-     * @param fireAndForget
+     * @param fireAndForget whether the bolt should fire and forget
      */
     public void setFireAndForget(boolean fireAndForget) {
         this.fireAndForget = fireAndForget;
@@ -211,10 +210,10 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
     
     @Override
     public String toString() {
-        return "KafkaBolt: {mapper: " + mapper +
-                " topicSelector: " + topicSelector +
-                " fireAndForget: " + fireAndForget +
-                " async: " + async +
-                " proerties: " + boltSpecifiedProperties;
+        return "KafkaBolt: {mapper: " + mapper 
+            + " topicSelector: " + topicSelector
+            + " fireAndForget: " + fireAndForget 
+            + " async: " + async 
+            + " proerties: " + boltSpecifiedProperties;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
index f7638aa..0302c57 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.kafka.bolt.mapper;
 
 import org.apache.storm.tuple.Tuple;

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
index 9f11fc9..531c60c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
@@ -15,11 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.kafka.bolt.mapper;
 
-import org.apache.storm.tuple.Tuple;
+package org.apache.storm.kafka.bolt.mapper;
 
 import java.io.Serializable;
+import org.apache.storm.tuple.Tuple;
 
 /**
  * Interface defining a mapping from storm tuple to kafka key and message.
@@ -27,6 +27,8 @@ import java.io.Serializable;
  * @param <V> type of value.
  */
 public interface TupleToKafkaMapper<K,V> extends Serializable {
+    
     K getKeyFromTuple(Tuple tuple);
+    
     V getMessageFromTuple(Tuple tuple);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
index 3d00fc1..4adc466 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.kafka.bolt.selector;
 
 import org.apache.storm.tuple.Tuple;

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
index ffe0b35a..e6c35ce 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.kafka.bolt.selector;
 
 import org.apache.storm.tuple.Tuple;
@@ -32,6 +33,11 @@ public class FieldIndexTopicSelector implements KafkaTopicSelector {
     private final int fieldIndex;
     private final String defaultTopicName;
 
+    /**
+     * Creates a new FieldIndexTopicSelector.
+     * @param fieldIndex The index of the field containing the topic name
+     * @param defaultTopicName The default topic name if the topic name cannot be read from the tuple
+     */
     public FieldIndexTopicSelector(int fieldIndex, String defaultTopicName) {
         this.fieldIndex = fieldIndex;
         if (fieldIndex < 0) {

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
index e90b26f..c8fa74c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.kafka.bolt.selector;
 
 import org.apache.storm.tuple.Tuple;

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
index cb7fb44..cd6dd29 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
@@ -15,11 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.kafka.bolt.selector;
 
-import org.apache.storm.tuple.Tuple;
+package org.apache.storm.kafka.bolt.selector;
 
 import java.io.Serializable;
+import org.apache.storm.tuple.Tuple;
 
 public interface KafkaTopicSelector extends Serializable {
     String getTopic(Tuple tuple);

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
index 855ab30..71f8e9e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
@@ -15,19 +15,19 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.tuple.Fields;
 
 /**
  * Based off of a given Kafka topic a ConsumerRecord came from it will be translated to a Storm tuple
- * and emitted to a given stream
+ * and emitted to a given stream.
  * @param <K> the key of the incoming Records
  * @param <V> the value of the incoming Records
  */
@@ -62,7 +62,7 @@ public class ByTopicRecordTranslator<K, V> implements RecordTranslator<K, V> {
     
     /**
      * @param defaultTranslator a translator that will be used for all topics not explicitly set
-     * elsewhere.
+     *     elsewhere.
      */
     public ByTopicRecordTranslator(RecordTranslator<K,V> defaultTranslator) {
         this.defaultTranslator = defaultTranslator;
@@ -77,7 +77,8 @@ public class ByTopicRecordTranslator<K, V> implements RecordTranslator<K, V> {
      * @param fields the names of the fields extracted
      * @return this to be able to chain configuration
      * @throws IllegalStateException if the topic is already registered to another translator
-     * @throws IllegalArgumentException if the Fields for the stream this emits to do not match any already configured Fields for the same stream
+     * @throws IllegalArgumentException if the Fields for the stream this emits to do not match
+     *     any already configured Fields for the same stream
      */
     public ByTopicRecordTranslator<K, V> forTopic(String topic, Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
         return forTopic(topic, new SimpleRecordTranslator<>(func, fields));
@@ -91,19 +92,22 @@ public class ByTopicRecordTranslator<K, V> implements RecordTranslator<K, V> {
      * @param stream the stream to emit the tuples to.
      * @return this to be able to chain configuration
      * @throws IllegalStateException if the topic is already registered to another translator
-     * @throws IllegalArgumentException if the Fields for the stream this emits to do not match any already configured Fields for the same stream
+     * @throws IllegalArgumentException if the Fields for the stream this emits to do not match
+     *     any already configured Fields for the same stream
      */
-    public ByTopicRecordTranslator<K, V> forTopic(String topic, Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
+    public ByTopicRecordTranslator<K, V> forTopic(String topic, Func<ConsumerRecord<K, V>,
+        List<Object>> func, Fields fields, String stream) {
         return forTopic(topic, new SimpleRecordTranslator<>(func, fields, stream));
     }
     
     /**
-     * Configure a translator for a given kafka topic
+     * Configure a translator for a given kafka topic.
      * @param topic the topic this translator should handle
      * @param translator the translator itself
      * @return this to be able to chain configuration
      * @throws IllegalStateException if the topic is already registered to another translator
-     * @throws IllegalArgumentException if the Fields for the stream this emits to do not match any already configured Fields for the same stream
+     * @throws IllegalArgumentException if the Fields for the stream this emits to do not match
+     *     any already configured Fields for the same stream
      */
     public ByTopicRecordTranslator<K, V> forTopic(String topic, RecordTranslator<K,V> translator) {
         if (topicToTranslator.containsKey(topic)) {
@@ -119,7 +123,8 @@ public class ByTopicRecordTranslator<K, V> implements RecordTranslator<K, V> {
             Fields fromTrans = translator.getFieldsFor(stream);
             Fields cached = streamToFields.get(stream);
             if (cached != null && !fromTrans.equals(cached)) {
-                throw new IllegalArgumentException("Stream " + stream + " currently has Fields of " + cached + " which is not the same as those being added in " + fromTrans);
+                throw new IllegalArgumentException("Stream " + stream + " currently has Fields of " 
+                    + cached + " which is not the same as those being added in " + fromTrans);
             }
             
             if (cached == null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
index f40ab18..b08b541 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
@@ -15,10 +15,10 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.util.List;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
@@ -26,6 +26,7 @@ import org.apache.storm.tuple.Values;
 public class DefaultRecordTranslator<K, V> implements RecordTranslator<K, V> {
     private static final long serialVersionUID = -5782462870112305750L;
     public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
+    
     @Override
     public List<Object> apply(ConsumerRecord<K, V> record) {
         return new Values(record.topic(),

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
index 0414533..643bc9d 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
@@ -15,13 +15,14 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.io.Serializable;
 
 /**
  * A simple interface to allow compatibility with non java 8
- * code bases 
+ * code bases. 
  */
 public interface Func<V, R> extends Serializable {
     R apply(V record);

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index bb76535..72b8d14 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -34,7 +34,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -71,17 +70,27 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient boolean consumerAutoCommitMode;
 
     // Bookkeeping
-    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  // Strategy to determine the fetch offset of the first realized by the spout upon activation
-    private transient KafkaSpoutRetryService retryService;              // Class that has the logic to handle tuple failure
-    private transient Timer commitTimer;                                // timer == null for auto commit mode
-    private transient boolean initialized;                              // Flag indicating that the spout is still undergoing initialization process.
+    // Strategy to determine the fetch offset of the first realized by the spout upon activation
+    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  
+    // Class that has the logic to handle tuple failure
+    private transient KafkaSpoutRetryService retryService;              
+    // timer == null for auto commit mode
+    private transient Timer commitTimer;                                
+    // Flag indicating that the spout is still undergoing initialization process.
+    private transient boolean initialized;                              
     // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
-    private transient Map<TopicPartition, OffsetManager> offsetManagers;// Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires, or after a consumer rebalance, or during close/deactivate
-    private transient Set<KafkaSpoutMessageId> emitted;                 // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
-    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;     // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
-    private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled.
-    private transient Timer refreshSubscriptionTimer;                   // Triggers when a subscription should be refreshed
+    // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires,
+    //or after a consumer rebalance, or during close/deactivate
+    private transient Map<TopicPartition, OffsetManager> offsetManagers;
+    // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
+    private transient Set<KafkaSpoutMessageId> emitted;                 
+    // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
+    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;     
+    // Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled.
+    private transient long numUncommittedOffsets;                       
+    // Triggers when a subscription should be refreshed
+    private transient Timer refreshSubscriptionTimer;                   
     private transient TopologyContext context;
 
 
@@ -147,7 +156,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         private void initialize(Collection<TopicPartition> partitions) {
             if (!consumerAutoCommitMode) {
-                offsetManagers.keySet().retainAll(partitions);   // remove from acked all partitions that are no longer assigned to this spout
+                // remove from acked all partitions that are no longer assigned to this spout
+                offsetManagers.keySet().retainAll(partitions);
             }
 
             retryService.retainAll(partitions);
@@ -175,7 +185,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
 
         /**
-         * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset
+         * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset.
          */
         private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) {
             long fetchOffset;
@@ -252,19 +262,22 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private boolean poll() {
         final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
         final int readyMessageCount = retryService.readyMessageCount();
-        final boolean poll = !waitingToEmit() &&
+        final boolean poll = !waitingToEmit()
             //Check that the number of uncommitted, nonretriable tuples is less than the maxUncommittedOffsets limit
-            //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis, and prevents locking up the spout when there are too many retriable tuples
-            (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets ||
-            consumerAutoCommitMode);
+            //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis,
+            //and prevents locking up the spout when there are too many retriable tuples
+            && (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets
+            || consumerAutoCommitMode);
         
         if (!poll) {
             if (waitingToEmit()) {
-                LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
+                LOG.debug("Not polling. Tuples waiting to be emitted."
+                    + " [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
             }
 
             if (numUncommittedOffsets >= maxUncommittedOffsets && !consumerAutoCommitMode) {
-                LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", numUncommittedOffsets, maxUncommittedOffsets);
+                LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]",
+                    numUncommittedOffsets, maxUncommittedOffsets);
             }
         }
         return poll;
@@ -274,7 +287,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return waitingToEmit != null && waitingToEmit.hasNext();
     }
 
-    public void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
+    private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
         List<ConsumerRecord<K, V>> waitingToEmitList = new LinkedList<>();
         for (TopicPartition tp : consumerRecords.partitions()) {
             waitingToEmitList.addAll(consumerRecords.records(tp));
@@ -290,7 +303,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
         final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
         final int numPolledRecords = consumerRecords.count();
-        LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, numUncommittedOffsets);
+        LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions",
+            numPolledRecords, numUncommittedOffsets);
         return consumerRecords;
     }
 
@@ -363,7 +377,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     /**
-     * Emits a tuple if it is not a null tuple, or if the spout is configured to emit null tuples
+     * Emits a tuple if it is not a null tuple, or if the spout is configured to emit null tuples.
      */
     private boolean isEmitTuple(List<Object> tuple) {
         return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
@@ -406,9 +420,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
         if (!emitted.contains(msgId)) {
             if (msgId.isEmitted()) {
-                LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " +
-                        "came from a topic-partition that this consumer group instance is no longer tracking " +
-                        "due to rebalance/partition reassignment. No action taken.", msgId);
+                LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
+                        + "came from a topic-partition that this consumer group instance is no longer tracking "
+                        + "due to rebalance/partition reassignment. No action taken.", msgId);
             } else {
                 LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
             }
@@ -426,7 +440,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     public void fail(Object messageId) {
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
         if (!emitted.contains(msgId)) {
-            LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
+            LOG.debug("Received fail for tuple this spout is no longer tracking."
+                + " Partitions may have been reassigned. Ignoring message [{}]", msgId);
             return;
         }
         emitted.remove(msgId);
@@ -493,10 +508,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @Override
     public String toString() {
-        return "KafkaSpout{" +
-                "offsetManagers =" + offsetManagers +
-                ", emitted=" + emitted +
-                "}";
+        return "KafkaSpout{"
+                + "offsetManagers =" + offsetManagers
+                + ", emitted=" + emitted
+                + "}";
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index e89ef93..43a6e0b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -18,13 +18,6 @@
 
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
-import org.apache.storm.tuple.Fields;
-
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
@@ -32,17 +25,28 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.tuple.Fields;
 
 /**
- * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
+ * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics.
  */
 public class KafkaSpoutConfig<K, V> implements Serializable {
     private static final long serialVersionUID = 141902646130682494L;
-    public static final long DEFAULT_POLL_TIMEOUT_MS = 200;            // 200ms
-    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;   // 30s
-    public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;     // Retry forever
-    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;    // 10,000,000 records => 80MBs of memory footprint in the worst case
-    public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; // 2s
+    // 200ms
+    public static final long DEFAULT_POLL_TIMEOUT_MS = 200;            
+    // 30s
+    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;   
+    // Retry forever
+    public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;     
+    // 10,000,000 records => 80MBs of memory footprint in the worst case
+    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;    
+    // 2s
+    public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; 
     public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =  
             new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
                     DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
@@ -50,22 +54,25 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
      * Retry in a tight loop (keep unit tests fasts) do not use in production.
      */
     public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = 
-    new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0),
+        new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0),
             DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0));
 
     /**
-     * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will
-     * affect the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
+     * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will affect the number of consumer
+     * records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
      * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. <br/>
      * <ul>
-     * <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits</li>
-     * <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits</li>
-     * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any.
-     * If no offset has been committed, it behaves as EARLIEST.</li>
-     * <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any.
-     * If no offset has been committed, it behaves as LATEST.</li>
+     * <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous
+     * commits</li>
+     * <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of
+     * previous commits</li>
+     * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any. If no offset has been
+     * committed, it behaves as EARLIEST.</li>
+     * <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any. If no offset has been
+     * committed, it behaves as LATEST.</li>
      * </ul>
-     * */
+     *
+     */
     public static enum FirstPollOffsetStrategy {
         EARLIEST,
         LATEST,
@@ -116,35 +123,43 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
         }
         
-        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Collection<String> topics) {
+        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
+            SerializableDeserializer<V> valDes, Collection<String> topics) {
             this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
         }
         
-        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern topics) {
+        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
+            SerializableDeserializer<V> valDes, Pattern topics) {
             this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
         }
         
-        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Subscription subscription) {
+        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
+            SerializableDeserializer<V> valDes, Subscription subscription) {
             this(bootstrapServers, keyDes, null, valDes, null, subscription);
         }
         
-        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String ... topics) {
+        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
+                Class<? extends Deserializer<V>> valDes, String ... topics) {
             this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
         }
         
-        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
+        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
+                Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
             this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
         }
         
-        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern topics) {
+        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
+                Class<? extends Deserializer<V>> valDes, Pattern topics) {
             this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
         }
         
-        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Subscription subscription) {
+        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
+                Class<? extends Deserializer<V>> valDes, Subscription subscription) {
             this(bootstrapServers, null, keyDes, null, valDes, subscription);
         }
         
-        private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+        private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
+                Class<? extends Deserializer<K>> keyDesClazz,
                 SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
             kafkaProps = new HashMap<>();
             if (bootstrapServers == null || bootstrapServers.isEmpty()) {
@@ -184,7 +199,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
          * set a custom RecordTranslator before calling this it may result in class cast
          * exceptions at runtime.
          */
-        public <NK> Builder<NK,V> setKey(SerializableDeserializer<NK> keyDeserializer) {
+        public <NewKeyT> Builder<NewKeyT,V> setKey(SerializableDeserializer<NewKeyT> keyDeserializer) {
             return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz);
         }
         
@@ -194,7 +209,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
          * set a custom RecordTranslator before calling this it may result in class cast
          * exceptions at runtime.
          */
-        public <NK> Builder<NK, V> setKey(Class<? extends Deserializer<NK>> clazz) {
+        public <NewKeyT> Builder<NewKeyT, V> setKey(Class<? extends Deserializer<NewKeyT>> clazz) {
             return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
         }
 
@@ -203,7 +218,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
          * set a custom RecordTranslator before calling this it may result in class cast
          * exceptions at runtime.
          */
-        public <NV> Builder<K,NV> setValue(SerializableDeserializer<NV> valueDeserializer) {
+        public <NewValueT> Builder<K,NewValueT> setValue(SerializableDeserializer<NewValueT> valueDeserializer) {
             return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, null);
         }
         
@@ -213,12 +228,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
          * set a custom RecordTranslator before calling this it may result in class cast
          * exceptions at runtime.
          */
-        public <NV> Builder<K,NV> setValue(Class<? extends Deserializer<NV>> clazz) {
+        public <NewValueT> Builder<K,NewValueT> setValue(Class<? extends Deserializer<NewValueT>> clazz) {
             return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
         }
         
         /**
-         * Set a Kafka property config
+         * Set a Kafka property config.
          */
         public Builder<K,V> setProp(String key, Object value) {
             kafkaProps.put(key, value);
@@ -226,7 +241,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
         
         /**
-         * Set multiple Kafka property configs
+         * Set multiple Kafka property configs.
          */
         public Builder<K,V> setProp(Map<String, Object> props) {
             kafkaProps.putAll(props);
@@ -234,7 +249,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
         
         /**
-         * Set multiple Kafka property configs
+         * Set multiple Kafka property configs.
          */
         public Builder<K,V> setProp(Properties props) {
             for (String name: props.stringPropertyNames()) {
@@ -244,14 +259,14 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
         
         /**
-         * Set the group.id for the consumers
+         * Set the group.id for the consumers.
          */
         public Builder<K,V> setGroupId(String id) {
             return setProp("group.id", id);
         }
         
         /**
-         * reset the bootstrap servers for the Consumer
+         * reset the bootstrap servers for the Consumer.
          */
         public Builder<K,V> setBootstrapServers(String servers) {
             return setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
@@ -281,7 +296,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         //Security Related Configs
         
         /**
-         * Configure the SSL Keystore for mutual authentication
+         * Configure the SSL Keystore for mutual authentication.
          */
         public Builder<K,V> setSSLKeystore(String location, String password) {
             return setProp("ssl.keystore.location", location)
@@ -289,7 +304,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
        
         /**
-         * Configure the SSL Keystore for mutual authentication
+         * Configure the SSL Keystore for mutual authentication.
          */
         public Builder<K,V> setSSLKeystore(String location, String password, String keyPassword) {
             return setProp("ssl.key.password", keyPassword)
@@ -297,7 +312,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
         
         /**
-         * Configure the SSL Truststore to authenticate with the brokers
+         * Configure the SSL Truststore to authenticate with the brokers.
          */
         public Builder<K,V> setSSLTruststore(String location, String password) {
             return setSecurityProtocol("SSL")
@@ -315,7 +330,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
         //Spout Settings
         /**
-         * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s
+         * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s.
          * @param pollTimeoutMs time in ms
          */
         public Builder<K,V> setPollTimeoutMs(long pollTimeoutMs) {
@@ -439,6 +454,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     private final long partitionRefreshPeriodMs;
     private final boolean emitNullTuples;
 
+    /**
+     * Creates a new KafkaSpoutConfig using a Builder.
+     * @param builder The Builder to construct the KafkaSpoutConfig from
+     */
     public KafkaSpoutConfig(Builder<K,V> builder) {
         this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
         this.subscription = builder.subscription;
@@ -456,10 +475,18 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         this.emitNullTuples = builder.emitNullTuples;
     }
 
+    /**
+     * Gets the properties that will be passed to the KafkaConsumer.
+     * @return The Kafka properties map
+     */
     public Map<String, Object> getKafkaProps() {
         return kafkaProps;
     }
 
+    /**
+     * Gets the Kafka record key deserializer.
+     * @return The key deserializer
+     */
     public Deserializer<K> getKeyDeserializer() {
         if (keyDesClazz != null) {
             try {
@@ -471,6 +498,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return keyDes;
     }
 
+    /**
+     * Gets the Kafka record value deserializer.
+     * @return The value deserializer
+     */
     public Deserializer<V> getValueDeserializer() {
         if (valueDesClazz != null) {
             try {
@@ -529,17 +560,17 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     @Override
     public String toString() {
-        return "KafkaSpoutConfig{" +
-                "kafkaProps=" + kafkaProps +
-                ", key=" + getKeyDeserializer() +
-                ", value=" + getValueDeserializer() +
-                ", pollTimeoutMs=" + pollTimeoutMs +
-                ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
-                ", maxUncommittedOffsets=" + maxUncommittedOffsets +
-                ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
-                ", subscription=" + subscription +
-                ", translator=" + translator +
-                ", retryService=" + retryService +
-                '}';
+        return "KafkaSpoutConfig{"
+                + "kafkaProps=" + kafkaProps
+                + ", key=" + getKeyDeserializer()
+                + ", value=" + getValueDeserializer()
+                + ", pollTimeoutMs=" + pollTimeoutMs
+                + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
+                + ", maxUncommittedOffsets=" + maxUncommittedOffsets
+                + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
+                + ", subscription=" + subscription
+                + ", translator=" + translator
+                + ", retryService=" + retryService
+                + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index dea57c4..e2ab5b7 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -25,8 +25,11 @@ public class KafkaSpoutMessageId {
     private transient TopicPartition topicPart;
     private transient long offset;
     private transient int numFails = 0;
-    private boolean emitted;   // true if the record was emitted using a form of collector.emit(...).
-                               // false when skipping null tuples as configured by the user in KafkaSpoutConfig
+    /**
+     * true if the record was emitted using a form of collector.emit(...). false
+     * when skipping null tuples as configured by the user in KafkaSpoutConfig
+     */
+    private boolean emitted;   
 
     public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) {
         this(consumerRecord, true);
@@ -40,6 +43,12 @@ public class KafkaSpoutMessageId {
         this(topicPart, offset, true);
     }
 
+    /**
+     * Creates a new KafkaSpoutMessageId.
+     * @param topicPart The topic partition this message belongs to
+     * @param offset The offset of this message
+     * @param emitted True iff this message is not being skipped as a null tuple
+     */
     public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean emitted) {
         this.topicPart = topicPart;
         this.offset = offset;
@@ -78,22 +87,27 @@ public class KafkaSpoutMessageId {
         this.emitted = emitted;
     }
 
+    /**
+     * Gets metadata for this message which may be committed to Kafka.
+     * @param currThread The calling thread
+     * @return The metadata
+     */
     public String getMetadata(Thread currThread) {
-        return "{" +
-                "topic-partition=" + topicPart +
-                ", offset=" + offset +
-                ", numFails=" + numFails +
-                ", thread='" + currThread.getName() + "'" +
-                '}';
+        return "{"
+                + "topic-partition=" + topicPart
+                + ", offset=" + offset
+                + "topic-partition=" + topicPart + ", numFails=" + numFails
+                + ", thread='" + currThread.getName() + "'"
+                + "topic-partition=" + topicPart + '}';
     }
 
     @Override
     public String toString() {
-        return "{" +
-                "topic-partition=" + topicPart +
-                ", offset=" + offset +
-                ", numFails=" + numFails +
-                '}';
+        return "{"
+                + "topic-partition=" + topicPart
+                + ", offset=" + offset
+                + ", numFails=" + numFails
+                + '}';
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index 5c6833a..60c34dc 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -18,10 +18,6 @@
 
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Comparator;
@@ -32,7 +28,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
@@ -52,14 +51,14 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>();      // Convenience data structure to speedup lookups
 
     /**
-     * Comparator ordering by timestamp 
+     * Comparator ordering by timestamp. 
      */
     private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
         @Override
         public int compare(RetrySchedule entry1, RetrySchedule entry2) {
             int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
             
-            if(result == 0) {
+            if (result == 0) {
                 //TreeSet uses compareTo instead of equals() for the Set contract
                 //Ensure that we can save two retry schedules with the same timestamp
                 result = entry1.hashCode() - entry2.hashCode();
@@ -89,10 +88,10 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
 
         @Override
         public String toString() {
-            return "RetrySchedule{" +
-                    "msgId=" + msgId +
-                    ", nextRetryTimeNanos=" + nextRetryTimeNanos +
-                    '}';
+            return "RetrySchedule{"
+                    + "msgId=" + msgId
+                    + ", nextRetryTimeNanos=" + nextRetryTimeNanos
+                    + '}';
         }
 
         public KafkaSpoutMessageId msgId() {
@@ -110,6 +109,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
         private final long length;
 
         /**
+         * Creates a new TimeInterval.
          * @param length length of the time interval in the units specified by {@link TimeUnit}
          * @param timeUnit unit used to specify a time interval on which to specify a time unit
          */
@@ -141,20 +141,20 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
 
         @Override
         public String toString() {
-            return "TimeInterval{" +
-                    "length=" + length +
-                    ", timeUnit=" + timeUnit +
-                    '}';
+            return "TimeInterval{"
+                    + "length=" + length
+                    + ", timeUnit=" + timeUnit
+                    + '}';
         }
     }
 
     /**
-     * The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression):
-     * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
-     * nextRetry = Min(nextRetry, currentTime + maxDelay).
-     * 
-     * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the previous
-     * polled records in favor of processing more records.
+     * The time stamp of the next retry is scheduled according to the exponential backoff formula (geometric progression):
+     * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1),
+     * where failCount = 1, 2, 3, ... nextRetry = Min(nextRetry, currentTime + maxDelay).
+     * <p/>
+     * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the
+     * previous polled records in favor of processing more records.
      *
      * @param initialDelay      initial delay of the first retry
      * @param delayPeriod       the time interval that is the ratio of the exponential backoff formula (geometric progression)
@@ -237,7 +237,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
         for (Iterator<RetrySchedule> rsIterator = retrySchedules.iterator(); rsIterator.hasNext(); ) {
             final RetrySchedule retrySchedule = rsIterator.next();
             final KafkaSpoutMessageId msgId = retrySchedule.msgId;
-            final TopicPartition tpRetry= new TopicPartition(msgId.topic(), msgId.partition());
+            final TopicPartition tpRetry = new TopicPartition(msgId.topic(), msgId.partition());
             if (!topicPartitions.contains(tpRetry)) {
                 rsIterator.remove();
                 toRetryMsgs.remove(msgId);
@@ -292,17 +292,17 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
         final long currentTimeNanos = Time.nanoTime();
         final long nextTimeNanos = msgId.numFails() == 1                // numFails = 1, 2, 3, ...
                 ? currentTimeNanos + initialDelay.lengthNanos
-                : currentTimeNanos + delayPeriod.lengthNanos * (long)(Math.pow(2, msgId.numFails()-1));
+                : currentTimeNanos + delayPeriod.lengthNanos * (long)(Math.pow(2, msgId.numFails() - 1));
         return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
     }
 
     @Override
     public String toString() {
-        return "KafkaSpoutRetryExponentialBackoff{" +
-                "delay=" + initialDelay +
-                ", ratio=" + delayPeriod +
-                ", maxRetries=" + maxRetries +
-                ", maxRetryDelay=" + maxDelay +
-                '}';
+        return "KafkaSpoutRetryExponentialBackoff{"
+                + "delay=" + initialDelay
+                + ", ratio=" + delayPeriod
+                + ", maxRetries=" + maxRetries
+                + ", maxRetryDelay=" + maxDelay
+                + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index f0230c3..04e4ae7 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -18,12 +18,10 @@
 
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.common.TopicPartition;
-
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
-import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
 
 /**
  * Represents the logic that manages the retrial of failed tuples.
@@ -39,7 +37,7 @@ public interface KafkaSpoutRetryService extends Serializable {
     boolean schedule(KafkaSpoutMessageId msgId);
 
     /**
-     * Removes a message from the list of messages scheduled for retrial
+     * Removes a message from the list of messages scheduled for retrial.
      * @param msgId message to remove from retrial
      * @return true if the message was scheduled for retrial, false otherwise
      */
@@ -56,8 +54,8 @@ public interface KafkaSpoutRetryService extends Serializable {
 
     /**
      * @return The earliest retriable offset for each TopicPartition that has
-     * offsets ready to be retried, i.e. for which a tuple has failed
-     * and has retry time less than current time
+     *     offsets ready to be retried, i.e. for which a tuple has failed
+     *     and has retry time less than current time.
      */
     Map<TopicPartition, Long> earliestRetriableOffsets();
 
@@ -74,7 +72,7 @@ public interface KafkaSpoutRetryService extends Serializable {
      * The message may or may not be ready to be retried yet.
      * @param msgId message to check for scheduling status
      * @return true if the message is scheduled to be retried, regardless of being or not ready to be retried.
-     * Returns false is this message is not scheduled for retrial
+     *     Returns false is this message is not scheduled for retrial
      */
     boolean isScheduled(KafkaSpoutMessageId msgId);
     

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
index f5953ad..02085f2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
@@ -15,6 +15,7 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import org.apache.storm.tuple.Values;
@@ -35,8 +36,13 @@ public class KafkaTuple extends Values {
         super(vals);
     }
     
+    /**
+     * Sets the target stream of this Tuple.
+     * @param stream The target stream
+     * @return This
+     */
     public KafkaTuple routedTo(String stream) {
-        assert(this.stream == null);
+        assert this.stream == null;
         this.stream = stream;
         return this;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
index 079cadb..926fdf0 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
@@ -15,6 +15,7 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.util.ArrayList;
@@ -24,7 +25,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
index 5f86605..2344477 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
@@ -15,6 +15,7 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.util.ArrayList;
@@ -24,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
-
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
@@ -55,7 +55,7 @@ public class ManualPartitionPatternSubscription extends PatternSubscription {
     @Override
     public void refreshAssignment() {
         List<TopicPartition> allPartitions = new ArrayList<>();
-        for(Map.Entry<String, List<PartitionInfo>> entry: consumer.listTopics().entrySet()) {
+        for (Map.Entry<String, List<PartitionInfo>> entry: consumer.listTopics().entrySet()) {
             if (pattern.matcher(entry.getKey()).matches()) {
                 for (PartitionInfo partitionInfo: entry.getValue()) {
                     allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
index 88803f8..4856687 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
@@ -15,10 +15,10 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.util.List;
-
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.task.TopologyContext;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
index 6eba566..0eb48cb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
@@ -15,13 +15,13 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.storm.task.TopologyContext;
@@ -29,7 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Subscribe to all topics that follow a given list of values
+ * Subscribe to all topics that follow a given list of values.
  */
 public class NamedSubscription extends Subscription {
     private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
index 9a8de0f..ec53f01 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
@@ -15,10 +15,10 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.util.regex.Pattern;
-
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.storm.task.TopologyContext;
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Subscribe to all topics that match a given pattern
+ * Subscribe to all topics that match a given pattern.
  */
 public class PatternSubscription extends Subscription {
     private static final Logger LOG = LoggerFactory.getLogger(PatternSubscription.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
index c79f4e1..e12453a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
@@ -15,16 +15,15 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
-package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.tuple.Fields;
+package org.apache.storm.kafka.spout;
 
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
-
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder;
+import org.apache.storm.tuple.Fields;
 
 /**
  * Translate a {@link org.apache.kafka.clients.consumer.ConsumerRecord} to a tuple.
@@ -36,20 +35,21 @@ public interface RecordTranslator<K, V> extends Serializable, Func<ConsumerRecor
      * Translate the ConsumerRecord into a list of objects that can be emitted
      * @param record the record to translate
      * @return the objects in the tuple.  Return a {@link KafkaTuple}
-     * if you want to route the tuple to a non-default stream.
-     * Return null to discard an invalid {@link ConsumerRecord} if {@link Builder#setEmitNullTuples(boolean)} is set to true
+     *     if you want to route the tuple to a non-default stream.
+     *     Return null to discard an invalid {@link ConsumerRecord} if {@link Builder#setEmitNullTuples(boolean)} is set to true
      */
     List<Object> apply(ConsumerRecord<K,V> record);
     
     /**
      * Get the fields associated with a stream.  The streams passed in are
-     * returned by the {@link RecordTranslator.streams} method.
+     * returned by the {@link RecordTranslator#streams() } method.
      * @param stream the stream the fields are for
      * @return the fields for that stream.
      */
     Fields getFieldsFor(String stream);
     
     /**
+     * Get the list of streams this translator will handle.
      * @return the list of streams that this will handle.
      */
     default List<String> streams() {

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
index e23e2dc..4afcc49 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
@@ -15,13 +15,13 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.task.TopologyContext;
 
@@ -30,7 +30,7 @@ import org.apache.storm.task.TopologyContext;
  * not just the ones that are alive. Because the parallelism of 
  * the spouts does not typically change while running this makes
  * the assignments more stable in the face of crashing spouts.
- * 
+ * <p/>
  * Round Robin means that first spout of N spouts will get the first
  * partition, and the N+1th partition... The second spout will get the second partition and
  * N+2th partition etc.
@@ -41,7 +41,7 @@ public class RoundRobinManualPartitioner implements ManualPartitioner {
     public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context) {
         int thisTaskIndex = context.getThisTaskIndex();
         int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
-        Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size()/totalTaskCount+1);
+        Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size() / totalTaskCount + 1);
         for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) {
             myPartitions.add(allPartitions.get(i));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
index eb76a90..120260e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
@@ -13,6 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
index 46c2849..a294ed4 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
@@ -15,11 +15,11 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.util.Arrays;
 import java.util.List;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.tuple.Fields;
 
@@ -33,6 +33,12 @@ public class SimpleRecordTranslator<K, V> implements RecordTranslator<K, V> {
         this(func, fields, "default");
     }
     
+    /**
+     * Creates a SimpleRecordTranslator.
+     * @param func The mapping function responsible for translating a Kafka record to a Tuple
+     * @param fields The fields tuples constructed by this translator will contain
+     * @param stream The stream tuples constructed by this translator will target
+     */
     public SimpleRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
         this.func = func;
         this.fields = fields;

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
index db2a7bb..53e825a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
@@ -15,10 +15,10 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.io.Serializable;
-
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.storm.task.TopologyContext;

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
index dafb97c..91b7243 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
@@ -15,24 +15,24 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.util.Comparator;
-
 import org.apache.kafka.common.TopicPartition;
 
 /**
  * Singleton comparator of TopicPartitions.  Topics have precedence over partitions.
  * Topics are compared through String.compare and partitions are compared
  * numerically.
- * 
+ * <p/>
  * Use INSTANCE for all sorting.
  */
 public class TopicPartitionComparator implements Comparator<TopicPartition> {
     public static final TopicPartitionComparator INSTANCE = new TopicPartitionComparator();
     
     /**
-     * Private to make it a singleton
+     * Private to make it a singleton.
      */
     private TopicPartitionComparator() {
         //Empty

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
index 0b253b4..fb70927 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
@@ -13,6 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.kafka.spout.internal;
 
 import java.io.Serializable;
@@ -20,7 +21,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 
 /**
- * This is here to enable testing
+ * This is here to enable testing.
  */
 public interface KafkaConsumerFactory<K, V> extends Serializable {
     public KafkaConsumer<K,V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig);

http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
index 7900388..c283815 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
@@ -13,6 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.kafka.spout.internal;
 
 import org.apache.kafka.clients.consumer.KafkaConsumer;