You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/05/21 10:41:51 UTC
[2/5] storm git commit: STORM-2515: Fix most checkstyle violations in
storm-kafka-client
STORM-2515: Fix most checkstyle violations in storm-kafka-client
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4d8f5d6e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4d8f5d6e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4d8f5d6e
Branch: refs/heads/master
Commit: 4d8f5d6edfd9c9a405cb4c6ea84312752e21fd6e
Parents: 2a0c168
Author: Stig Rohde Døssing <st...@gmail.com>
Authored: Mon May 15 21:47:04 2017 +0200
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 19:30:44 2017 +0900
----------------------------------------------------------------------
external/storm-kafka-client/pom.xml | 2 +-
.../org/apache/storm/kafka/bolt/KafkaBolt.java | 43 +++---
.../FieldNameBasedTupleToKafkaMapper.java | 1 +
.../kafka/bolt/mapper/TupleToKafkaMapper.java | 6 +-
.../bolt/selector/DefaultTopicSelector.java | 1 +
.../bolt/selector/FieldIndexTopicSelector.java | 6 +
.../bolt/selector/FieldNameTopicSelector.java | 1 +
.../kafka/bolt/selector/KafkaTopicSelector.java | 4 +-
.../kafka/spout/ByTopicRecordTranslator.java | 23 +--
.../kafka/spout/DefaultRecordTranslator.java | 3 +-
.../java/org/apache/storm/kafka/spout/Func.java | 3 +-
.../apache/storm/kafka/spout/KafkaSpout.java | 73 ++++++----
.../storm/kafka/spout/KafkaSpoutConfig.java | 143 +++++++++++--------
.../storm/kafka/spout/KafkaSpoutMessageId.java | 40 ++++--
.../KafkaSpoutRetryExponentialBackoff.java | 56 ++++----
.../kafka/spout/KafkaSpoutRetryService.java | 12 +-
.../apache/storm/kafka/spout/KafkaTuple.java | 8 +-
.../spout/ManualPartitionNamedSubscription.java | 2 +-
.../ManualPartitionPatternSubscription.java | 4 +-
.../storm/kafka/spout/ManualPartitioner.java | 2 +-
.../storm/kafka/spout/NamedSubscription.java | 4 +-
.../storm/kafka/spout/PatternSubscription.java | 4 +-
.../storm/kafka/spout/RecordTranslator.java | 16 +--
.../spout/RoundRobinManualPartitioner.java | 6 +-
.../kafka/spout/SerializableDeserializer.java | 1 +
.../kafka/spout/SimpleRecordTranslator.java | 8 +-
.../apache/storm/kafka/spout/Subscription.java | 2 +-
.../kafka/spout/TopicPartitionComparator.java | 6 +-
.../spout/internal/KafkaConsumerFactory.java | 3 +-
.../internal/KafkaConsumerFactoryDefault.java | 1 +
.../kafka/spout/internal/OffsetManager.java | 25 +++-
.../storm/kafka/spout/internal/Timer.java | 3 +-
.../trident/KafkaTridentSpoutBatchMetadata.java | 29 ++--
.../spout/trident/KafkaTridentSpoutEmitter.java | 56 ++++----
.../spout/trident/KafkaTridentSpoutManager.java | 15 +-
.../spout/trident/KafkaTridentSpoutOpaque.java | 12 +-
.../KafkaTridentSpoutOpaqueCoordinator.java | 16 +--
.../KafkaTridentSpoutTopicPartition.java | 13 +-
...KafkaTridentSpoutTopicPartitionRegistry.java | 3 +-
.../trident/KafkaTridentSpoutTransactional.java | 3 +-
.../storm/kafka/trident/TridentKafkaState.java | 78 +++++-----
.../kafka/trident/TridentKafkaStateFactory.java | 12 +-
.../FieldNameBasedTupleToKafkaMapper.java | 1 +
.../mapper/TridentTupleToKafkaMapper.java | 7 +-
.../trident/selector/DefaultTopicSelector.java | 1 +
.../trident/selector/KafkaTopicSelector.java | 4 +-
.../java/org/apache/storm/kafka/KafkaUnit.java | 21 ++-
.../org/apache/storm/kafka/KafkaUnitRule.java | 3 +-
.../apache/storm/kafka/bolt/KafkaBoltTest.java | 1 -
.../spout/ByTopicRecordTranslatorTest.java | 1 -
.../spout/DefaultRecordTranslatorTest.java | 1 -
.../storm/kafka/spout/KafkaSpoutConfigTest.java | 9 +-
.../storm/kafka/spout/KafkaSpoutEmitTest.java | 16 +--
.../kafka/spout/KafkaSpoutRebalanceTest.java | 1 -
.../kafka/spout/MaxUncommittedOffsetTest.java | 1 -
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 31 ++--
.../kafka/spout/test/KafkaSpoutTestBolt.java | 3 +-
.../test/KafkaSpoutTopologyMainNamedTopics.java | 1 -
.../KafkaSpoutTopologyMainWildcardTopics.java | 1 -
59 files changed, 470 insertions(+), 382 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index a275428..843868f 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -152,7 +152,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>342</maxAllowedViolations>
+ <maxAllowedViolations>9</maxAllowedViolations>
</configuration>
</plugin>
</plugins>
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
index 1a401b2..2ff1a54 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
@@ -15,29 +15,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.bolt;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.clients.producer.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
-import java.util.concurrent.Future;
-import java.util.concurrent.ExecutionException;
-import java.util.Map;
-import java.util.Properties;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -81,7 +80,7 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
}
/**
- * Set the messages to be published to a single topic
+ * Set the messages to be published to a single topic.
* @param topic the topic to publish to
* @return this
*/
@@ -139,7 +138,7 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
key = mapper.getKeyFromTuple(input);
message = mapper.getMessageFromTuple(input);
topic = topicSelector.getTopic(input);
- if (topic != null ) {
+ if (topic != null) {
Callback callback = null;
if (!fireAndForget && async) {
@@ -194,7 +193,7 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
* the tuple as soon as it has handed the message off to the producer API
* if false (the default) the message will be acked after it was successfully sent to kafka or
* failed if it was not successfully sent.
- * @param fireAndForget
+ * @param fireAndForget whether the bolt should fire and forget
*/
public void setFireAndForget(boolean fireAndForget) {
this.fireAndForget = fireAndForget;
@@ -211,10 +210,10 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
@Override
public String toString() {
- return "KafkaBolt: {mapper: " + mapper +
- " topicSelector: " + topicSelector +
- " fireAndForget: " + fireAndForget +
- " async: " + async +
- " proerties: " + boltSpecifiedProperties;
+ return "KafkaBolt: {mapper: " + mapper
+ + " topicSelector: " + topicSelector
+ + " fireAndForget: " + fireAndForget
+ + " async: " + async
+ + " proerties: " + boltSpecifiedProperties;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
index f7638aa..0302c57 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.bolt.mapper;
import org.apache.storm.tuple.Tuple;
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
index 9f11fc9..531c60c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
@@ -15,11 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.kafka.bolt.mapper;
-import org.apache.storm.tuple.Tuple;
+package org.apache.storm.kafka.bolt.mapper;
import java.io.Serializable;
+import org.apache.storm.tuple.Tuple;
/**
* Interface defining a mapping from storm tuple to kafka key and message.
@@ -27,6 +27,8 @@ import java.io.Serializable;
* @param <V> type of value.
*/
public interface TupleToKafkaMapper<K,V> extends Serializable {
+
K getKeyFromTuple(Tuple tuple);
+
V getMessageFromTuple(Tuple tuple);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
index 3d00fc1..4adc466 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.bolt.selector;
import org.apache.storm.tuple.Tuple;
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
index ffe0b35a..e6c35ce 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.bolt.selector;
import org.apache.storm.tuple.Tuple;
@@ -32,6 +33,11 @@ public class FieldIndexTopicSelector implements KafkaTopicSelector {
private final int fieldIndex;
private final String defaultTopicName;
+ /**
+ * Creates a new FieldIndexTopicSelector.
+ * @param fieldIndex The index of the field containing the topic name
+ * @param defaultTopicName The default topic name if the topic name cannot be read from the tuple
+ */
public FieldIndexTopicSelector(int fieldIndex, String defaultTopicName) {
this.fieldIndex = fieldIndex;
if (fieldIndex < 0) {
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
index e90b26f..c8fa74c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.bolt.selector;
import org.apache.storm.tuple.Tuple;
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
index cb7fb44..cd6dd29 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
@@ -15,11 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.kafka.bolt.selector;
-import org.apache.storm.tuple.Tuple;
+package org.apache.storm.kafka.bolt.selector;
import java.io.Serializable;
+import org.apache.storm.tuple.Tuple;
public interface KafkaTopicSelector extends Serializable {
String getTopic(Tuple tuple);
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
index 855ab30..71f8e9e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
@@ -15,19 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.tuple.Fields;
/**
* Based off of a given Kafka topic a ConsumerRecord came from it will be translated to a Storm tuple
- * and emitted to a given stream
+ * and emitted to a given stream.
* @param <K> the key of the incoming Records
* @param <V> the value of the incoming Records
*/
@@ -62,7 +62,7 @@ public class ByTopicRecordTranslator<K, V> implements RecordTranslator<K, V> {
/**
* @param defaultTranslator a translator that will be used for all topics not explicitly set
- * elsewhere.
+ * elsewhere.
*/
public ByTopicRecordTranslator(RecordTranslator<K,V> defaultTranslator) {
this.defaultTranslator = defaultTranslator;
@@ -77,7 +77,8 @@ public class ByTopicRecordTranslator<K, V> implements RecordTranslator<K, V> {
* @param fields the names of the fields extracted
* @return this to be able to chain configuration
* @throws IllegalStateException if the topic is already registered to another translator
- * @throws IllegalArgumentException if the Fields for the stream this emits to do not match any already configured Fields for the same stream
+ * @throws IllegalArgumentException if the Fields for the stream this emits to do not match
+ * any already configured Fields for the same stream
*/
public ByTopicRecordTranslator<K, V> forTopic(String topic, Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
return forTopic(topic, new SimpleRecordTranslator<>(func, fields));
@@ -91,19 +92,22 @@ public class ByTopicRecordTranslator<K, V> implements RecordTranslator<K, V> {
* @param stream the stream to emit the tuples to.
* @return this to be able to chain configuration
* @throws IllegalStateException if the topic is already registered to another translator
- * @throws IllegalArgumentException if the Fields for the stream this emits to do not match any already configured Fields for the same stream
+ * @throws IllegalArgumentException if the Fields for the stream this emits to do not match
+ * any already configured Fields for the same stream
*/
- public ByTopicRecordTranslator<K, V> forTopic(String topic, Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
+ public ByTopicRecordTranslator<K, V> forTopic(String topic, Func<ConsumerRecord<K, V>,
+ List<Object>> func, Fields fields, String stream) {
return forTopic(topic, new SimpleRecordTranslator<>(func, fields, stream));
}
/**
- * Configure a translator for a given kafka topic
+ * Configure a translator for a given kafka topic.
* @param topic the topic this translator should handle
* @param translator the translator itself
* @return this to be able to chain configuration
* @throws IllegalStateException if the topic is already registered to another translator
- * @throws IllegalArgumentException if the Fields for the stream this emits to do not match any already configured Fields for the same stream
+ * @throws IllegalArgumentException if the Fields for the stream this emits to do not match
+ * any already configured Fields for the same stream
*/
public ByTopicRecordTranslator<K, V> forTopic(String topic, RecordTranslator<K,V> translator) {
if (topicToTranslator.containsKey(topic)) {
@@ -119,7 +123,8 @@ public class ByTopicRecordTranslator<K, V> implements RecordTranslator<K, V> {
Fields fromTrans = translator.getFieldsFor(stream);
Fields cached = streamToFields.get(stream);
if (cached != null && !fromTrans.equals(cached)) {
- throw new IllegalArgumentException("Stream " + stream + " currently has Fields of " + cached + " which is not the same as those being added in " + fromTrans);
+ throw new IllegalArgumentException("Stream " + stream + " currently has Fields of "
+ + cached + " which is not the same as those being added in " + fromTrans);
}
if (cached == null) {
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
index f40ab18..b08b541 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
@@ -15,10 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout;
import java.util.List;
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
@@ -26,6 +26,7 @@ import org.apache.storm.tuple.Values;
public class DefaultRecordTranslator<K, V> implements RecordTranslator<K, V> {
private static final long serialVersionUID = -5782462870112305750L;
public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
+
@Override
public List<Object> apply(ConsumerRecord<K, V> record) {
return new Values(record.topic(),
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
index 0414533..643bc9d 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
@@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout;
import java.io.Serializable;
/**
* A simple interface to allow compatibility with non java 8
- * code bases
+ * code bases.
*/
public interface Func<V, R> extends Serializable {
R apply(V record);
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index bb76535..72b8d14 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -34,7 +34,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -71,17 +70,27 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private transient boolean consumerAutoCommitMode;
// Bookkeeping
- private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation
- private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure
- private transient Timer commitTimer; // timer == null for auto commit mode
- private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process.
+ // Strategy to determine the fetch offset of the first realized by the spout upon activation
+ private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
+ // Class that has the logic to handle tuple failure
+ private transient KafkaSpoutRetryService retryService;
+ // timer == null for auto commit mode
+ private transient Timer commitTimer;
+ // Flag indicating that the spout is still undergoing initialization process.
+ private transient boolean initialized;
// Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
- private transient Map<TopicPartition, OffsetManager> offsetManagers;// Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires, or after a consumer rebalance, or during close/deactivate
- private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
- private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
- private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled.
- private transient Timer refreshSubscriptionTimer; // Triggers when a subscription should be refreshed
+ // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires,
+ //or after a consumer rebalance, or during close/deactivate
+ private transient Map<TopicPartition, OffsetManager> offsetManagers;
+ // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
+ private transient Set<KafkaSpoutMessageId> emitted;
+ // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
+ private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
+ // Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled.
+ private transient long numUncommittedOffsets;
+ // Triggers when a subscription should be refreshed
+ private transient Timer refreshSubscriptionTimer;
private transient TopologyContext context;
@@ -147,7 +156,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private void initialize(Collection<TopicPartition> partitions) {
if (!consumerAutoCommitMode) {
- offsetManagers.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout
+ // remove from acked all partitions that are no longer assigned to this spout
+ offsetManagers.keySet().retainAll(partitions);
}
retryService.retainAll(partitions);
@@ -175,7 +185,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
/**
- * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset
+ * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset.
*/
private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) {
long fetchOffset;
@@ -252,19 +262,22 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private boolean poll() {
final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
final int readyMessageCount = retryService.readyMessageCount();
- final boolean poll = !waitingToEmit() &&
+ final boolean poll = !waitingToEmit()
//Check that the number of uncommitted, nonretriable tuples is less than the maxUncommittedOffsets limit
- //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis, and prevents locking up the spout when there are too many retriable tuples
- (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets ||
- consumerAutoCommitMode);
+ //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis,
+ //and prevents locking up the spout when there are too many retriable tuples
+ && (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets
+ || consumerAutoCommitMode);
if (!poll) {
if (waitingToEmit()) {
- LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
+ LOG.debug("Not polling. Tuples waiting to be emitted."
+ + " [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
}
if (numUncommittedOffsets >= maxUncommittedOffsets && !consumerAutoCommitMode) {
- LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", numUncommittedOffsets, maxUncommittedOffsets);
+ LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]",
+ numUncommittedOffsets, maxUncommittedOffsets);
}
}
return poll;
@@ -274,7 +287,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
return waitingToEmit != null && waitingToEmit.hasNext();
}
- public void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
+ private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
List<ConsumerRecord<K, V>> waitingToEmitList = new LinkedList<>();
for (TopicPartition tp : consumerRecords.partitions()) {
waitingToEmitList.addAll(consumerRecords.records(tp));
@@ -290,7 +303,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
final int numPolledRecords = consumerRecords.count();
- LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, numUncommittedOffsets);
+ LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions",
+ numPolledRecords, numUncommittedOffsets);
return consumerRecords;
}
@@ -363,7 +377,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
/**
- * Emits a tuple if it is not a null tuple, or if the spout is configured to emit null tuples
+ * Emits a tuple if it is not a null tuple, or if the spout is configured to emit null tuples.
*/
private boolean isEmitTuple(List<Object> tuple) {
return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
@@ -406,9 +420,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (!emitted.contains(msgId)) {
if (msgId.isEmitted()) {
- LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that " +
- "came from a topic-partition that this consumer group instance is no longer tracking " +
- "due to rebalance/partition reassignment. No action taken.", msgId);
+ LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
+ + "came from a topic-partition that this consumer group instance is no longer tracking "
+ + "due to rebalance/partition reassignment. No action taken.", msgId);
} else {
LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
}
@@ -426,7 +440,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
public void fail(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (!emitted.contains(msgId)) {
- LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
+ LOG.debug("Received fail for tuple this spout is no longer tracking."
+ + " Partitions may have been reassigned. Ignoring message [{}]", msgId);
return;
}
emitted.remove(msgId);
@@ -493,10 +508,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public String toString() {
- return "KafkaSpout{" +
- "offsetManagers =" + offsetManagers +
- ", emitted=" + emitted +
- "}";
+ return "KafkaSpout{"
+ + "offsetManagers =" + offsetManagers
+ + ", emitted=" + emitted
+ + "}";
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index e89ef93..43a6e0b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -18,13 +18,6 @@
package org.apache.storm.kafka.spout;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
-import org.apache.storm.tuple.Fields;
-
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
@@ -32,17 +25,28 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.tuple.Fields;
/**
- * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
+ * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics.
*/
public class KafkaSpoutConfig<K, V> implements Serializable {
private static final long serialVersionUID = 141902646130682494L;
- public static final long DEFAULT_POLL_TIMEOUT_MS = 200; // 200ms
- public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000; // 30s
- public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // Retry forever
- public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000; // 10,000,000 records => 80MBs of memory footprint in the worst case
- public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; // 2s
+ // 200ms
+ public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
+ // 30s
+ public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;
+ // Retry forever
+ public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
+ // 10,000,000 records => 80MBs of memory footprint in the worst case
+ public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;
+ // 2s
+ public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
@@ -50,22 +54,25 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
* Retry in a tight loop (keep unit tests fasts) do not use in production.
*/
public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE =
- new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0),
+ new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0),
DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0));
/**
- * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will
- * affect the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
+ * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will affect the number of consumer
+ * records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
* The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. <br/>
* <ul>
- * <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits</li>
- * <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits</li>
- * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any.
- * If no offset has been committed, it behaves as EARLIEST.</li>
- * <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any.
- * If no offset has been committed, it behaves as LATEST.</li>
+ * <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous
+ * commits</li>
+ * <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of
+ * previous commits</li>
+ * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any. If no offset has been
+ * committed, it behaves as EARLIEST.</li>
+ * <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any. If no offset has been
+ * committed, it behaves as LATEST.</li>
* </ul>
- * */
+ *
+ */
public static enum FirstPollOffsetStrategy {
EARLIEST,
LATEST,
@@ -116,35 +123,43 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
}
- public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Collection<String> topics) {
+ public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
+ SerializableDeserializer<V> valDes, Collection<String> topics) {
this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
}
- public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern topics) {
+ public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
+ SerializableDeserializer<V> valDes, Pattern topics) {
this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
}
- public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Subscription subscription) {
+ public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
+ SerializableDeserializer<V> valDes, Subscription subscription) {
this(bootstrapServers, keyDes, null, valDes, null, subscription);
}
- public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String ... topics) {
+ public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
+ Class<? extends Deserializer<V>> valDes, String ... topics) {
this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
}
- public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
+ public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
+ Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
}
- public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern topics) {
+ public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
+ Class<? extends Deserializer<V>> valDes, Pattern topics) {
this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
}
- public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Subscription subscription) {
+ public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes,
+ Class<? extends Deserializer<V>> valDes, Subscription subscription) {
this(bootstrapServers, null, keyDes, null, valDes, subscription);
}
- private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+ private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes,
+ Class<? extends Deserializer<K>> keyDesClazz,
SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
kafkaProps = new HashMap<>();
if (bootstrapServers == null || bootstrapServers.isEmpty()) {
@@ -184,7 +199,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
* set a custom RecordTranslator before calling this it may result in class cast
* exceptions at runtime.
*/
- public <NK> Builder<NK,V> setKey(SerializableDeserializer<NK> keyDeserializer) {
+ public <NewKeyT> Builder<NewKeyT,V> setKey(SerializableDeserializer<NewKeyT> keyDeserializer) {
return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz);
}
@@ -194,7 +209,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
* set a custom RecordTranslator before calling this it may result in class cast
* exceptions at runtime.
*/
- public <NK> Builder<NK, V> setKey(Class<? extends Deserializer<NK>> clazz) {
+ public <NewKeyT> Builder<NewKeyT, V> setKey(Class<? extends Deserializer<NewKeyT>> clazz) {
return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
}
@@ -203,7 +218,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
* set a custom RecordTranslator before calling this it may result in class cast
* exceptions at runtime.
*/
- public <NV> Builder<K,NV> setValue(SerializableDeserializer<NV> valueDeserializer) {
+ public <NewValueT> Builder<K,NewValueT> setValue(SerializableDeserializer<NewValueT> valueDeserializer) {
return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, null);
}
@@ -213,12 +228,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
* set a custom RecordTranslator before calling this it may result in class cast
* exceptions at runtime.
*/
- public <NV> Builder<K,NV> setValue(Class<? extends Deserializer<NV>> clazz) {
+ public <NewValueT> Builder<K,NewValueT> setValue(Class<? extends Deserializer<NewValueT>> clazz) {
return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
}
/**
- * Set a Kafka property config
+ * Set a Kafka property config.
*/
public Builder<K,V> setProp(String key, Object value) {
kafkaProps.put(key, value);
@@ -226,7 +241,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
}
/**
- * Set multiple Kafka property configs
+ * Set multiple Kafka property configs.
*/
public Builder<K,V> setProp(Map<String, Object> props) {
kafkaProps.putAll(props);
@@ -234,7 +249,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
}
/**
- * Set multiple Kafka property configs
+ * Set multiple Kafka property configs.
*/
public Builder<K,V> setProp(Properties props) {
for (String name: props.stringPropertyNames()) {
@@ -244,14 +259,14 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
}
/**
- * Set the group.id for the consumers
+ * Set the group.id for the consumers.
*/
public Builder<K,V> setGroupId(String id) {
return setProp("group.id", id);
}
/**
- * reset the bootstrap servers for the Consumer
+ * reset the bootstrap servers for the Consumer.
*/
public Builder<K,V> setBootstrapServers(String servers) {
return setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
@@ -281,7 +296,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
//Security Related Configs
/**
- * Configure the SSL Keystore for mutual authentication
+ * Configure the SSL Keystore for mutual authentication.
*/
public Builder<K,V> setSSLKeystore(String location, String password) {
return setProp("ssl.keystore.location", location)
@@ -289,7 +304,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
}
/**
- * Configure the SSL Keystore for mutual authentication
+ * Configure the SSL Keystore for mutual authentication.
*/
public Builder<K,V> setSSLKeystore(String location, String password, String keyPassword) {
return setProp("ssl.key.password", keyPassword)
@@ -297,7 +312,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
}
/**
- * Configure the SSL Truststore to authenticate with the brokers
+ * Configure the SSL Truststore to authenticate with the brokers.
*/
public Builder<K,V> setSSLTruststore(String location, String password) {
return setSecurityProtocol("SSL")
@@ -315,7 +330,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
//Spout Settings
/**
- * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s
+ * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s.
* @param pollTimeoutMs time in ms
*/
public Builder<K,V> setPollTimeoutMs(long pollTimeoutMs) {
@@ -439,6 +454,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
private final long partitionRefreshPeriodMs;
private final boolean emitNullTuples;
+ /**
+ * Creates a new KafkaSpoutConfig using a Builder.
+ * @param builder The Builder to construct the KafkaSpoutConfig from
+ */
public KafkaSpoutConfig(Builder<K,V> builder) {
this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
this.subscription = builder.subscription;
@@ -456,10 +475,18 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.emitNullTuples = builder.emitNullTuples;
}
+ /**
+ * Gets the properties that will be passed to the KafkaConsumer.
+ * @return The Kafka properties map
+ */
public Map<String, Object> getKafkaProps() {
return kafkaProps;
}
+ /**
+ * Gets the Kafka record key deserializer.
+ * @return The key deserializer
+ */
public Deserializer<K> getKeyDeserializer() {
if (keyDesClazz != null) {
try {
@@ -471,6 +498,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return keyDes;
}
+ /**
+ * Gets the Kafka record value deserializer.
+ * @return The value deserializer
+ */
public Deserializer<V> getValueDeserializer() {
if (valueDesClazz != null) {
try {
@@ -529,17 +560,17 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
@Override
public String toString() {
- return "KafkaSpoutConfig{" +
- "kafkaProps=" + kafkaProps +
- ", key=" + getKeyDeserializer() +
- ", value=" + getValueDeserializer() +
- ", pollTimeoutMs=" + pollTimeoutMs +
- ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
- ", maxUncommittedOffsets=" + maxUncommittedOffsets +
- ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
- ", subscription=" + subscription +
- ", translator=" + translator +
- ", retryService=" + retryService +
- '}';
+ return "KafkaSpoutConfig{"
+ + "kafkaProps=" + kafkaProps
+ + ", key=" + getKeyDeserializer()
+ + ", value=" + getValueDeserializer()
+ + ", pollTimeoutMs=" + pollTimeoutMs
+ + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
+ + ", maxUncommittedOffsets=" + maxUncommittedOffsets
+ + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
+ + ", subscription=" + subscription
+ + ", translator=" + translator
+ + ", retryService=" + retryService
+ + '}';
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index dea57c4..e2ab5b7 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -25,8 +25,11 @@ public class KafkaSpoutMessageId {
private transient TopicPartition topicPart;
private transient long offset;
private transient int numFails = 0;
- private boolean emitted; // true if the record was emitted using a form of collector.emit(...).
- // false when skipping null tuples as configured by the user in KafkaSpoutConfig
+ /**
+ * true if the record was emitted using a form of collector.emit(...). false
+ * when skipping null tuples as configured by the user in KafkaSpoutConfig
+ */
+ private boolean emitted;
public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) {
this(consumerRecord, true);
@@ -40,6 +43,12 @@ public class KafkaSpoutMessageId {
this(topicPart, offset, true);
}
+ /**
+ * Creates a new KafkaSpoutMessageId.
+ * @param topicPart The topic partition this message belongs to
+ * @param offset The offset of this message
+ * @param emitted True iff this message is not being skipped as a null tuple
+ */
public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean emitted) {
this.topicPart = topicPart;
this.offset = offset;
@@ -78,22 +87,27 @@ public class KafkaSpoutMessageId {
this.emitted = emitted;
}
+ /**
+ * Gets metadata for this message which may be committed to Kafka.
+ * @param currThread The calling thread
+ * @return The metadata
+ */
public String getMetadata(Thread currThread) {
- return "{" +
- "topic-partition=" + topicPart +
- ", offset=" + offset +
- ", numFails=" + numFails +
- ", thread='" + currThread.getName() + "'" +
- '}';
+ return "{"
+ + "topic-partition=" + topicPart
+ + ", offset=" + offset
+ + "topic-partition=" + topicPart + ", numFails=" + numFails
+ + ", thread='" + currThread.getName() + "'"
+ + "topic-partition=" + topicPart + '}';
}
@Override
public String toString() {
- return "{" +
- "topic-partition=" + topicPart +
- ", offset=" + offset +
- ", numFails=" + numFails +
- '}';
+ return "{"
+ + "topic-partition=" + topicPart
+ + ", offset=" + offset
+ + ", numFails=" + numFails
+ + '}';
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index 5c6833a..60c34dc 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -18,10 +18,6 @@
package org.apache.storm.kafka.spout;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
@@ -32,7 +28,10 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.TopicPartition;
import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
@@ -52,14 +51,14 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups
/**
- * Comparator ordering by timestamp
+ * Comparator ordering by timestamp.
*/
private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
@Override
public int compare(RetrySchedule entry1, RetrySchedule entry2) {
int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
- if(result == 0) {
+ if (result == 0) {
//TreeSet uses compareTo instead of equals() for the Set contract
//Ensure that we can save two retry schedules with the same timestamp
result = entry1.hashCode() - entry2.hashCode();
@@ -89,10 +88,10 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
@Override
public String toString() {
- return "RetrySchedule{" +
- "msgId=" + msgId +
- ", nextRetryTimeNanos=" + nextRetryTimeNanos +
- '}';
+ return "RetrySchedule{"
+ + "msgId=" + msgId
+ + ", nextRetryTimeNanos=" + nextRetryTimeNanos
+ + '}';
}
public KafkaSpoutMessageId msgId() {
@@ -110,6 +109,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
private final long length;
/**
+ * Creates a new TimeInterval.
* @param length length of the time interval in the units specified by {@link TimeUnit}
* @param timeUnit unit used to specify a time interval on which to specify a time unit
*/
@@ -141,20 +141,20 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
@Override
public String toString() {
- return "TimeInterval{" +
- "length=" + length +
- ", timeUnit=" + timeUnit +
- '}';
+ return "TimeInterval{"
+ + "length=" + length
+ + ", timeUnit=" + timeUnit
+ + '}';
}
}
/**
- * The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression):
- * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
- * nextRetry = Min(nextRetry, currentTime + maxDelay).
- *
- * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the previous
- * polled records in favor of processing more records.
+ * The time stamp of the next retry is scheduled according to the exponential backoff formula (geometric progression):
+ * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1),
+ * where failCount = 1, 2, 3, ... nextRetry = Min(nextRetry, currentTime + maxDelay).
+ * <p/>
+ * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the
+ * previous polled records in favor of processing more records.
*
* @param initialDelay initial delay of the first retry
* @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression)
@@ -237,7 +237,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
for (Iterator<RetrySchedule> rsIterator = retrySchedules.iterator(); rsIterator.hasNext(); ) {
final RetrySchedule retrySchedule = rsIterator.next();
final KafkaSpoutMessageId msgId = retrySchedule.msgId;
- final TopicPartition tpRetry= new TopicPartition(msgId.topic(), msgId.partition());
+ final TopicPartition tpRetry = new TopicPartition(msgId.topic(), msgId.partition());
if (!topicPartitions.contains(tpRetry)) {
rsIterator.remove();
toRetryMsgs.remove(msgId);
@@ -292,17 +292,17 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
final long currentTimeNanos = Time.nanoTime();
final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ...
? currentTimeNanos + initialDelay.lengthNanos
- : currentTimeNanos + delayPeriod.lengthNanos * (long)(Math.pow(2, msgId.numFails()-1));
+ : currentTimeNanos + delayPeriod.lengthNanos * (long)(Math.pow(2, msgId.numFails() - 1));
return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
}
@Override
public String toString() {
- return "KafkaSpoutRetryExponentialBackoff{" +
- "delay=" + initialDelay +
- ", ratio=" + delayPeriod +
- ", maxRetries=" + maxRetries +
- ", maxRetryDelay=" + maxDelay +
- '}';
+ return "KafkaSpoutRetryExponentialBackoff{"
+ + "delay=" + initialDelay
+ + ", ratio=" + delayPeriod
+ + ", maxRetries=" + maxRetries
+ + ", maxRetryDelay=" + maxDelay
+ + '}';
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index f0230c3..04e4ae7 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -18,12 +18,10 @@
package org.apache.storm.kafka.spout;
-import org.apache.kafka.common.TopicPartition;
-
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
-import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
/**
* Represents the logic that manages the retrial of failed tuples.
@@ -39,7 +37,7 @@ public interface KafkaSpoutRetryService extends Serializable {
boolean schedule(KafkaSpoutMessageId msgId);
/**
- * Removes a message from the list of messages scheduled for retrial
+ * Removes a message from the list of messages scheduled for retrial.
* @param msgId message to remove from retrial
* @return true if the message was scheduled for retrial, false otherwise
*/
@@ -56,8 +54,8 @@ public interface KafkaSpoutRetryService extends Serializable {
/**
* @return The earliest retriable offset for each TopicPartition that has
- * offsets ready to be retried, i.e. for which a tuple has failed
- * and has retry time less than current time
+ * offsets ready to be retried, i.e. for which a tuple has failed
+ * and has retry time less than current time.
*/
Map<TopicPartition, Long> earliestRetriableOffsets();
@@ -74,7 +72,7 @@ public interface KafkaSpoutRetryService extends Serializable {
* The message may or may not be ready to be retried yet.
* @param msgId message to check for scheduling status
* @return true if the message is scheduled to be retried, regardless of being or not ready to be retried.
- * Returns false is this message is not scheduled for retrial
+ * Returns false is this message is not scheduled for retrial
*/
boolean isScheduled(KafkaSpoutMessageId msgId);
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
index f5953ad..02085f2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout;
import org.apache.storm.tuple.Values;
@@ -35,8 +36,13 @@ public class KafkaTuple extends Values {
super(vals);
}
+ /**
+ * Sets the target stream of this Tuple.
+ * @param stream The target stream
+ * @return This
+ */
public KafkaTuple routedTo(String stream) {
- assert(this.stream == null);
+ assert this.stream == null;
this.stream = stream;
return this;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
index 079cadb..926fdf0 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout;
import java.util.ArrayList;
@@ -24,7 +25,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
index 5f86605..2344477 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout;
import java.util.ArrayList;
@@ -24,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
-
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
@@ -55,7 +55,7 @@ public class ManualPartitionPatternSubscription extends PatternSubscription {
@Override
public void refreshAssignment() {
List<TopicPartition> allPartitions = new ArrayList<>();
- for(Map.Entry<String, List<PartitionInfo>> entry: consumer.listTopics().entrySet()) {
+ for (Map.Entry<String, List<PartitionInfo>> entry: consumer.listTopics().entrySet()) {
if (pattern.matcher(entry.getKey()).matches()) {
for (PartitionInfo partitionInfo: entry.getValue()) {
allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
index 88803f8..4856687 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
@@ -15,10 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout;
import java.util.List;
-
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.task.TopologyContext;
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
index 6eba566..0eb48cb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
@@ -15,13 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.storm.task.TopologyContext;
@@ -29,7 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Subscribe to all topics that follow a given list of values
+ * Subscribe to all topics that follow a given list of values.
*/
public class NamedSubscription extends Subscription {
private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class);
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
index 9a8de0f..ec53f01 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
@@ -15,10 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout;
import java.util.regex.Pattern;
-
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.storm.task.TopologyContext;
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Subscribe to all topics that match a given pattern
+ * Subscribe to all topics that match a given pattern.
*/
public class PatternSubscription extends Subscription {
private static final Logger LOG = LoggerFactory.getLogger(PatternSubscription.class);
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
index c79f4e1..e12453a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
@@ -15,16 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.kafka.spout;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.tuple.Fields;
+package org.apache.storm.kafka.spout;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
-
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder;
+import org.apache.storm.tuple.Fields;
/**
* Translate a {@link org.apache.kafka.clients.consumer.ConsumerRecord} to a tuple.
@@ -36,20 +35,21 @@ public interface RecordTranslator<K, V> extends Serializable, Func<ConsumerRecor
* Translate the ConsumerRecord into a list of objects that can be emitted
* @param record the record to translate
* @return the objects in the tuple. Return a {@link KafkaTuple}
- * if you want to route the tuple to a non-default stream.
- * Return null to discard an invalid {@link ConsumerRecord} if {@link Builder#setEmitNullTuples(boolean)} is set to true
+ * if you want to route the tuple to a non-default stream.
+ * Return null to discard an invalid {@link ConsumerRecord} if {@link Builder#setEmitNullTuples(boolean)} is set to true
*/
List<Object> apply(ConsumerRecord<K,V> record);
/**
* Get the fields associated with a stream. The streams passed in are
- * returned by the {@link RecordTranslator.streams} method.
+ * returned by the {@link RecordTranslator#streams() } method.
* @param stream the stream the fields are for
* @return the fields for that stream.
*/
Fields getFieldsFor(String stream);
/**
+ * Get the list of streams this translator will handle.
* @return the list of streams that this will handle.
*/
default List<String> streams() {
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
index e23e2dc..4afcc49 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
@@ -15,13 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.task.TopologyContext;
@@ -30,7 +30,7 @@ import org.apache.storm.task.TopologyContext;
* not just the ones that are alive. Because the parallelism of
* the spouts does not typically change while running this makes
* the assignments more stable in the face of crashing spouts.
- *
+ * <p/>
* Round Robin means that first spout of N spouts will get the first
* partition, and the N+1th partition... The second spout will get the second partition and
* N+2th partition etc.
@@ -41,7 +41,7 @@ public class RoundRobinManualPartitioner implements ManualPartitioner {
public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context) {
int thisTaskIndex = context.getThisTaskIndex();
int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
- Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size()/totalTaskCount+1);
+ Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size() / totalTaskCount + 1);
for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) {
myPartitions.add(allPartitions.get(i));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
index eb76a90..120260e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout;
import java.io.Serializable;
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
index 46c2849..a294ed4 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
@@ -15,11 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout;
import java.util.Arrays;
import java.util.List;
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.tuple.Fields;
@@ -33,6 +33,12 @@ public class SimpleRecordTranslator<K, V> implements RecordTranslator<K, V> {
this(func, fields, "default");
}
+ /**
+ * Creates a SimpleRecordTranslator.
+ * @param func The mapping function responsible for translating a Kafka record to a Tuple
+ * @param fields The fields tuples constructed by this translator will contain
+ * @param stream The stream tuples constructed by this translator will target
+ */
public SimpleRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
this.func = func;
this.fields = fields;
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
index db2a7bb..53e825a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
@@ -15,10 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout;
import java.io.Serializable;
-
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.storm.task.TopologyContext;
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
index dafb97c..91b7243 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
@@ -15,24 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout;
import java.util.Comparator;
-
import org.apache.kafka.common.TopicPartition;
/**
* Singleton comparator of TopicPartitions. Topics have precedence over partitions.
* Topics are compared through String.compare and partitions are compared
* numerically.
- *
+ * <p/>
* Use INSTANCE for all sorting.
*/
public class TopicPartitionComparator implements Comparator<TopicPartition> {
public static final TopicPartitionComparator INSTANCE = new TopicPartitionComparator();
/**
- * Private to make it a singleton
+ * Private to make it a singleton.
*/
private TopicPartitionComparator() {
//Empty
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
index 0b253b4..fb70927 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout.internal;
import java.io.Serializable;
@@ -20,7 +21,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
/**
- * This is here to enable testing
+ * This is here to enable testing.
*/
public interface KafkaConsumerFactory<K, V> extends Serializable {
public KafkaConsumer<K,V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig);
http://git-wip-us.apache.org/repos/asf/storm/blob/4d8f5d6e/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
index 7900388..c283815 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.kafka.spout.internal;
import org.apache.kafka.clients.consumer.KafkaConsumer;