You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/31 21:00:09 UTC
[2/3] storm git commit: STORM-822: Kafka Spout New Consumer API -
Refactored code to avoid keeping records data inside spout state - Refactored
code to specify output fields per stream and build tuples per topic -
Implement exponential backoff retry s
STORM-822: Kafka Spout New Consumer API
- Refactored code to avoid keeping records data inside spout state
- Refactored code to specify output fields per stream and build tuples per topic
- Implement exponential backoff retry strategy
- Send one tuple per call to nextTuple
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/332afc40
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/332afc40
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/332afc40
Branch: refs/heads/master
Commit: 332afc40d8ee7c1f4a4a747280ff83a92c279c5d
Parents: d26b984
Author: Hugo Louro <hm...@gmail.com>
Authored: Mon Mar 21 14:42:50 2016 -0700
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 31 13:59:32 2016 -0500
----------------------------------------------------------------------
.../kafka/spout/KafkaRecordTupleBuilder.java | 44 ---
.../apache/storm/kafka/spout/KafkaSpout.java | 160 +++++++----
.../storm/kafka/spout/KafkaSpoutConfig.java | 87 +++---
.../storm/kafka/spout/KafkaSpoutMessageId.java | 16 +-
.../KafkaSpoutRetryExponentialBackoff.java | 281 +++++++++++++++++++
.../kafka/spout/KafkaSpoutRetryService.java | 72 +++++
.../storm/kafka/spout/KafkaSpoutStream.java | 14 +-
.../storm/kafka/spout/KafkaSpoutStreams.java | 26 +-
.../kafka/spout/KafkaSpoutTupleBuilder.java | 34 ++-
.../kafka/spout/KafkaSpoutTuplesBuilder.java | 82 ++++++
.../kafka/spout/test/KafkaSpoutTestBolt.java | 50 ++++
.../spout/test/KafkaSpoutTopologyMain.java | 37 ++-
.../storm/kafka/spout/test/KafkaTestBolt.java | 52 ----
.../spout/test/TopicTest2TupleBuilder.java | 40 +++
.../test/TopicsTest0Test1TupleBuilder.java | 42 +++
15 files changed, 798 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
deleted file mode 100644
index 4d67632..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import java.util.List;
-
-public class KafkaRecordTupleBuilder<K, V> implements KafkaSpoutTupleBuilder<K, V> {
- @Override
- public List<Object> buildTuple(final ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams) {
- final Fields outputFields = kafkaSpoutStreams.getOutputFields(consumerRecord.topic());
- if (outputFields != null) {
- if (outputFields.size() == 3) {
- return new Values(consumerRecord.topic(),
- consumerRecord.partition(),
- consumerRecord.offset());
- } else if (outputFields.size() == 5) {
- return new Values(consumerRecord.topic(),
- consumerRecord.partition(),
- consumerRecord.offset(),
- consumerRecord.key(),
- consumerRecord.value());
- }
- }
- throw new RuntimeException("Failed to build tuple. " + consumerRecord + " " + kafkaSpoutStreams);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 9a49ee8..d211ae9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -34,12 +33,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
@@ -62,23 +65,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
// Bookkeeping
- private KafkaSpoutStreams kafkaSpoutStreams;
- private KafkaSpoutTupleBuilder<K, V> tupleBuilder;
- private transient Timer commitTimer; // timer == null for auto commit mode
- private transient Timer logTimer;
- private transient Map<TopicPartition, OffsetEntry> acked; // emitted tuples that were successfully acked. These tuples will be committed periodically when the timer expires, on consumer rebalance, or on close/deactivate
- private transient int maxRetries; // Max number of times a tuple is retried
- private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process.
+ private transient int maxRetries; // Max number of times a tuple is retried
+ private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation
+ private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure
+ private transient Timer commitTimer; // timer == null for auto commit mode
+ private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process.
// Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
- private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed
- private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
- private transient PollStrategy pollStrategy;
+ private KafkaSpoutStreams kafkaSpoutStreams; // Object that wraps all the logic to declare output fields and emit tuples
+ private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; // Object that contains the logic to build tuples for each ConsumerRecord
- public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaSpoutTupleBuilder<K, V> tupleBuilder) {
+ private transient Map<TopicPartition, OffsetEntry> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
+ private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed
+ private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
+ private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed
+
+
+ public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration
this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
- this.tupleBuilder = tupleBuilder;
}
@Override
@@ -89,18 +94,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
this.collector = collector;
maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
numUncommittedOffsets = 0;
- logTimer = new Timer(500, Math.min(1000, kafkaSpoutConfig.getOffsetsCommitPeriodMs()/2), TimeUnit.MILLISECONDS);
// Offset management
firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
- pollStrategy = kafkaSpoutConfig.getPollStrategy();
consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
+ // Retries management
+ retryService = kafkaSpoutConfig.getRetryService();
+
+ // Tuples builder delegate
+ tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder();
+
if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually
commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
- acked = new HashMap<>();
}
+ acked = new HashMap<>();
+ emitted = new HashSet<>();
+ waitingToEmit = Collections.emptyListIterator();
+
LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
}
@@ -130,6 +142,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
acked.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout
}
+ retryService.retainAll(partitions);
+
for (TopicPartition tp : partitions) {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
final long fetchOffset = doSeek(tp, committedOffset);
@@ -182,67 +196,88 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
if (initialized) {
if (commit()) {
commitOffsetsForAckedTuples();
- } else if (poll()) {
- emitTuples(pollKafkaBroker());
- } else if (logTimer.isExpiredResetOnTrue()) { // to limit the number of messages that get printed.
- log();
+ }
+
+ if (poll()) {
+ setWaitingToEmit(pollKafkaBroker());
+ }
+
+ if (waitingToEmit()) {
+ emit();
}
} else {
LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
}
}
- private void log() {
- switch(pollStrategy) {
- case STREAM:
- LOG.trace("Reached the maximum number number of uncommitted records [{}]. " +
- "No more polls will occur until a sequence of commits sets the count under the [{}] threshold ",
- numUncommittedOffsets, kafkaSpoutConfig.getMaxUncommittedOffsets());
- break;
- case BATCH:
- LOG.trace("No more polls will occur until the last batch completes. [{}] emitted tuples pending", numUncommittedOffsets);
- break;
- default:
- throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy);
- }
+ private boolean commit() {
+ return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode
+ }
+ private boolean poll() {
+ return !waitingToEmit() && numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets();
}
- // always poll in auto commit mode because no state is kept and therefore there is no need to set an upper limit in memory
- private boolean poll() {
- switch(pollStrategy) {
- case STREAM:
- return consumerAutoCommitMode || numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets();
- case BATCH:
- return consumerAutoCommitMode || numUncommittedOffsets <= 0;
- default:
- throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy);
- }
+ private boolean waitingToEmit() {
+ return waitingToEmit != null && waitingToEmit.hasNext();
}
- private boolean commit() {
- return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode
+ public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
+ List<ConsumerRecord<K,V>> waitingToEmitList = new LinkedList<>();
+ for (TopicPartition tp : consumerRecords.partitions()) {
+ waitingToEmitList.addAll(consumerRecords.records(tp));
+ }
+ waitingToEmit = waitingToEmitList.iterator();
+ LOG.trace("Records waiting to be emitted {}", waitingToEmitList);
}
+ // ======== poll =========
private ConsumerRecords<K, V> pollKafkaBroker() {
+ doSeekRetriableTopicPartitions();
+
final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
final int numPolledRecords = consumerRecords.count();
- numUncommittedOffsets+= numPolledRecords;
LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets);
return consumerRecords;
}
- private void emitTuples(ConsumerRecords<K, V> consumerRecords) {
- for (TopicPartition tp : consumerRecords.partitions()) {
- final Iterable<ConsumerRecord<K, V>> records = consumerRecords.records(tp.topic());
+ private void doSeekRetriableTopicPartitions() {
+ final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
+
+ for (TopicPartition rtp : retriableTopicPartitions) {
+ final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
+ if (offsetAndMeta != null) {
+ kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
+ } else {
+ kafkaConsumer.seekToEnd(rtp); // Seek to last committed offset
+ }
+ }
+ }
- for (final ConsumerRecord<K, V> record : records) {
- final List<Object> tuple = tupleBuilder.buildTuple(record, kafkaSpoutStreams);
- final KafkaSpoutMessageId messageId = new KafkaSpoutMessageId(record, tuple);
+ // ======== emit =========
+ private void emit() {
+ emitTupleIfNotEmitted(waitingToEmit.next());
+ waitingToEmit.remove();
+ }
- kafkaSpoutStreams.emit(collector, messageId); // emits one tuple per record
- LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+ // emits one tuple per record
+ private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
+ final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+ final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+
+ if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) { // has been acked
+ LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
+ } else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
+ LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
+ } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
+ final List<Object> tuple = tuplesBuilder.buildTuple(record);
+ kafkaSpoutStreams.emit(collector, tuple, msgId);
+ emitted.add(msgId);
+ numUncommittedOffsets++;
+ if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
+ retryService.remove(msgId); // re-emitted hence remove from failed
}
+ LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
}
}
@@ -275,11 +310,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void ack(Object messageId) {
+ final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically
- final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
acked.get(msgId.getTopicPartition()).add(msgId);
LOG.trace("Acked message [{}]. Messages acked and pending commit [{}]", msgId, acked);
}
+ emitted.remove(msgId);
}
// ======== Fail =======
@@ -287,10 +323,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void fail(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+ emitted.remove(msgId);
if (msgId.numFails() < maxRetries) {
msgId.incrementNumFails();
- kafkaSpoutStreams.emit(collector, msgId);
- LOG.trace("Retried tuple with message id [{}]", msgId);
+ retryService.schedule(msgId);
} else { // limit to max number of retries
LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
ack(msgId);
@@ -367,7 +403,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
this.tp = tp;
this.initialFetchOffset = initialFetchOffset;
this.committedOffset = initialFetchOffset - 1;
- LOG.debug("Created OffsetEntry. {}", this);
+ LOG.debug("Instantiated {}", this);
}
public void add(KafkaSpoutMessageId msgId) { // O(Log N)
@@ -434,6 +470,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
return ackedMsgs.isEmpty();
}
+ public boolean contains(ConsumerRecord record) {
+ return contains(new KafkaSpoutMessageId(record));
+ }
+
+ public boolean contains(KafkaSpoutMessageId msgId) {
+ return ackedMsgs.contains(msgId);
+ }
+
@Override
public String toString() {
return "OffsetEntry{" +
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index d969f1f..29cedb2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -18,7 +18,9 @@
package org.apache.storm.kafka.spout;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
import java.io.Serializable;
import java.util.ArrayList;
@@ -63,24 +65,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
UNCOMMITTED_EARLIEST,
UNCOMMITTED_LATEST }
- /**
- * Defines when to poll the next batch of records from Kafka. The choice of this parameter will affect throughput and the memory
- * footprint of the Kafka spout. The allowed values are STREAM and BATCH. STREAM will likely have higher throughput and use more memory
- * (it stores in memory the entire KafkaRecord,including data). BATCH will likely have less throughput but also use less memory.
- * The BATCH behavior is similar to the behavior of the previous Kafka Spout. De default value is STREAM.
- * <ul>
- * <li>STREAM Every periodic call to nextTuple polls a new batch of records from Kafka as long as the maxUncommittedOffsets
- * threshold has not yet been reached. When the threshold his reached, no more records are polled until enough offsets have been
- * committed, such that the number of pending offsets is less than maxUncommittedOffsets. See {@link Builder#setMaxUncommittedOffsets(int)}
- * </li>
- * <li>BATCH Only polls a new batch of records from kafka once all the records that came in the previous poll have been acked.</li>
- * </ul>
- */
- public enum PollStrategy {
- STREAM,
- BATCH
- }
-
// Kafka consumer configuration
private final Map<String, Object> kafkaProps;
private final Deserializer<K> keyDeserializer;
@@ -92,8 +76,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
private final int maxRetries;
private final int maxUncommittedOffsets;
private final FirstPollOffsetStrategy firstPollOffsetStrategy;
- private final PollStrategy pollStrategy;
private final KafkaSpoutStreams kafkaSpoutStreams;
+ private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
+ private final KafkaSpoutRetryService retryService;
private KafkaSpoutConfig(Builder<K,V> builder) {
this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
@@ -103,9 +88,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
this.maxRetries = builder.maxRetries;
this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
- this.pollStrategy = builder.pollStrategy;
this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+ this.tuplesBuilder = builder.tuplesBuilder;
+ this.retryService = builder.retryService;
}
private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
@@ -117,33 +103,61 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
}
public static class Builder<K,V> {
- private Map<String, Object> kafkaProps;
+ private final Map<String, Object> kafkaProps;
private Deserializer<K> keyDeserializer;
private Deserializer<V> valueDeserializer;
private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
private int maxRetries = DEFAULT_MAX_RETRIES;
private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
- private KafkaSpoutStreams kafkaSpoutStreams;
+ private final KafkaSpoutStreams kafkaSpoutStreams;
private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
- private PollStrategy pollStrategy = PollStrategy.STREAM;
+ private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
+ private final KafkaSpoutRetryService retryService;
+
+ /**
+ * Please refer to javadoc in {@link #Builder(Map, KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/>
+ * This constructor uses by the default the following implementation for {@link KafkaSpoutRetryService}:<p/>
+ * {@code new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+ * DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
+ */
+ public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
+ KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) {
+ this(kafkaProps, kafkaSpoutStreams, tuplesBuilder,
+ new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+ DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)));
+ }
/***
* KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
* The optional configuration can be specified using the set methods of this builder
* @param kafkaProps properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a>
* @param kafkaSpoutStreams streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream.
+ * @param tuplesBuilder logic to build tuples from {@link ConsumerRecord}s.
+ * @param retryService logic that manages the retrial of failed tuples
*/
- public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams) {
+ public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
+ KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, KafkaSpoutRetryService retryService) {
if (kafkaProps == null || kafkaProps.isEmpty()) {
- throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required. " + kafkaProps);
+ throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required: " + kafkaProps);
}
if (kafkaSpoutStreams == null) {
- throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit in the same stream.");
+ throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit to the same stream");
+ }
+
+ if (tuplesBuilder == null) {
+ throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams");
}
+
+ if (retryService == null) {
+ throw new IllegalArgumentException("Must specify at implementation of retry service");
+ }
+
this.kafkaProps = kafkaProps;
this.kafkaSpoutStreams = kafkaSpoutStreams;
+ this.tuplesBuilder = tuplesBuilder;
+ this.retryService = retryService;
}
/**
@@ -214,16 +228,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return this;
}
- /**
- * Sets the strategy used by the the Kafka spout to decide when to poll the next batch of records from Kafka.
- * Please refer to to the documentation in {@link PollStrategy}
- * @param pollStrategy strategy used to decide when to poll
- * */
- public Builder<K, V> setPollStrategy(PollStrategy pollStrategy) {
- this.pollStrategy = pollStrategy;
- return this;
- }
-
public KafkaSpoutConfig<K,V> build() {
return new KafkaSpoutConfig<>(this);
}
@@ -258,6 +262,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return (String) kafkaProps.get(Consumer.GROUP_ID);
}
+ /**
+ * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream}
+ */
public List<String> getSubscribedTopics() {
return new ArrayList<>(kafkaSpoutStreams.getTopics());
}
@@ -278,8 +285,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return maxUncommittedOffsets;
}
- public PollStrategy getPollStrategy() {
- return pollStrategy;
+ public KafkaSpoutTuplesBuilder<K, V> getTuplesBuilder() {
+ return tuplesBuilder;
+ }
+
+ public KafkaSpoutRetryService getRetryService() {
+ return retryService;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index 0a6b126..71f8327 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -21,23 +21,18 @@ package org.apache.storm.kafka.spout;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
-import java.util.Collections;
-import java.util.List;
-
public class KafkaSpoutMessageId {
private transient TopicPartition topicPart;
private transient long offset;
- private transient List<Object> tuple;
private transient int numFails = 0;
- public KafkaSpoutMessageId(ConsumerRecord consumerRecord, List<Object> tuple) {
- this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), tuple);
+ public KafkaSpoutMessageId(ConsumerRecord consumerRecord) {
+ this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
}
- public KafkaSpoutMessageId(TopicPartition topicPart, long offset, List<Object> tuple) {
+ public KafkaSpoutMessageId(TopicPartition topicPart, long offset) {
this.topicPart = topicPart;
this.offset = offset;
- this.tuple = tuple;
}
public int partition() {
@@ -64,10 +59,6 @@ public class KafkaSpoutMessageId {
return topicPart;
}
- public List<Object> getTuple() {
- return Collections.unmodifiableList(tuple);
- }
-
public String getMetadata(Thread currThread) {
return "{" +
"topic-partition=" + topicPart +
@@ -83,7 +74,6 @@ public class KafkaSpoutMessageId {
"topic-partition=" + topicPart +
", offset=" + offset +
", numFails=" + numFails +
- ", tuple=" + tuple +
'}';
}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
new file mode 100644
index 0000000..208cef4
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
+ * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
+ * nextRetry = Min(nextRetry, currentTime + maxDelay)
+ */
+public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
+ private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
+
+ private TimeInterval initialDelay;
+ private TimeInterval delayPeriod;
+ private TimeInterval maxDelay;
+ private int maxRetries;
+
+ private Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
+ private Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups
+
+ /**
+ * Comparator ordering by timestamp
+ */
+ private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
+ public int compare(RetrySchedule entry1, RetrySchedule entry2) {
+ return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
+ }
+ }
+
+ private class RetrySchedule {
+ private KafkaSpoutMessageId msgId;
+ private long nextRetryTimeNanos;
+
+ public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
+ this.msgId = msgId;
+ this.nextRetryTimeNanos = nextRetryTime;
+ LOG.debug("Created {}", this);
+ }
+
+ public void setNextRetryTime() {
+ nextRetryTimeNanos = nextTime(msgId);
+ LOG.debug("Updated {}", this);
+ }
+
+ public boolean retry(long currentTimeNanos) {
+ return nextRetryTimeNanos <= currentTimeNanos;
+ }
+
+ @Override
+ public String toString() {
+ return "RetrySchedule{" +
+ "msgId=" + msgId +
+ ", nextRetryTime=" + nextRetryTimeNanos +
+ '}';
+ }
+
+ public KafkaSpoutMessageId msgId() {
+ return msgId;
+ }
+
+ public long nextRetryTimeNanos() {
+ return nextRetryTimeNanos;
+ }
+ }
+
+ public static class TimeInterval implements Serializable {
+ private long lengthNanos;
+ private long length;
+ private TimeUnit timeUnit;
+
+ /**
+ * @param length length of the time interval in the units specified by {@link TimeUnit}
+ * @param timeUnit unit used to specify a time interval on which to specify a time unit
+ */
+ public TimeInterval(long length, TimeUnit timeUnit) {
+ this.length = length;
+ this.timeUnit = timeUnit;
+ this.lengthNanos = timeUnit.toNanos(length);
+ }
+
+ public static TimeInterval seconds(long length) {
+ return new TimeInterval(length, TimeUnit.SECONDS);
+ }
+
+ public static TimeInterval milliSeconds(long length) {
+ return new TimeInterval(length, TimeUnit.MILLISECONDS);
+ }
+
+ public static TimeInterval microSeconds(long length) {
+ return new TimeInterval(length, TimeUnit.MILLISECONDS);
+ }
+
+ public long lengthNanos() {
+ return lengthNanos;
+ }
+
+ public long length() {
+ return length;
+ }
+
+ public TimeUnit timeUnit() {
+ return timeUnit;
+ }
+
+ @Override
+ public String toString() {
+ return "TimeInterval{" +
+ "length=" + length +
+ ", timeUnit=" + timeUnit +
+ '}';
+ }
+ }
+
+ /**
+ * The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression):
+ * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
+ * nextRetry = Min(nextRetry, currentTime + maxDelay)
+ *
+ * @param initialDelay initial delay of the first retry
+ * @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression)
+ * @param maxRetries maximum number of times a tuple is retried before being acked and scheduled for commit
+ * @param maxDelay maximum amount of time waiting before retrying
+ *
+ */
+ public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
+ this.initialDelay = initialDelay;
+ this.delayPeriod = delayPeriod;
+ this.maxRetries = maxRetries;
+ this.maxDelay = maxDelay;
+ LOG.debug("Instantiated {}", this);
+ }
+
+ @Override
+ public Set<TopicPartition> retriableTopicPartitions() {
+ final Set<TopicPartition> tps = new TreeSet<>();
+ final long currentTimeNanos = System.nanoTime();
+ for (RetrySchedule retrySchedule : retrySchedules) {
+ if (retrySchedule.retry(currentTimeNanos)) {
+ final KafkaSpoutMessageId msgId = retrySchedule.msgId;
+ tps.add(new TopicPartition(msgId.topic(), msgId.partition()));
+ } else {
+ break; // Stop searching as soon as passed current time
+ }
+ }
+ LOG.debug("Topic partitions with entries ready to be retried [{}] ", tps);
+ return tps;
+ }
+
+ @Override
+ public boolean isReady(KafkaSpoutMessageId msgId) {
+ boolean retry = false;
+ if (toRetryMsgs.contains(msgId)) {
+ final long currentTimeNanos = System.nanoTime();
+ for (RetrySchedule retrySchedule : retrySchedules) {
+ if (retrySchedule.retry(currentTimeNanos)) {
+ if (retrySchedule.msgId.equals(msgId)) {
+ retry = true;
+ LOG.debug("Found entry to retry {}", retrySchedule);
+ }
+ } else {
+ LOG.debug("Entry to retry not found {}", retrySchedule);
+ break; // Stop searching as soon as passed current time
+ }
+ }
+ }
+ return retry;
+ }
+
+ @Override
+ public boolean isScheduled(KafkaSpoutMessageId msgId) {
+ return toRetryMsgs.contains(msgId);
+ }
+
+ @Override
+ public boolean remove(KafkaSpoutMessageId msgId) {
+ boolean removed = false;
+ if (toRetryMsgs.contains(msgId)) {
+ for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
+ final RetrySchedule retrySchedule = iterator.next();
+ if (retrySchedule.msgId().equals(msgId)) {
+ iterator.remove();
+ toRetryMsgs.remove(msgId);
+ removed = true;
+ break;
+ }
+ }
+ }
+ LOG.debug(removed ? "Removed {} " : "Not removed {}", msgId);
+ LOG.trace("Current state {}", retrySchedules);
+ return removed;
+ }
+
+ @Override
+ public boolean retainAll(Collection<TopicPartition> topicPartitions) {
+ boolean result = false;
+ for (Iterator<RetrySchedule> rsIterator = retrySchedules.iterator(); rsIterator.hasNext(); ) {
+ final RetrySchedule retrySchedule = rsIterator.next();
+ final KafkaSpoutMessageId msgId = retrySchedule.msgId;
+ final TopicPartition tpRetry= new TopicPartition(msgId.topic(), msgId.partition());
+ if (!topicPartitions.contains(tpRetry)) {
+ rsIterator.remove();
+ toRetryMsgs.remove(msgId);
+ LOG.debug("Removed {}", retrySchedule);
+ LOG.trace("Current state {}", retrySchedules);
+ result = true;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void schedule(KafkaSpoutMessageId msgId) {
+ if (msgId.numFails() > maxRetries) {
+ LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
+ } else {
+ if (toRetryMsgs.contains(msgId)) {
+ for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
+ final RetrySchedule retrySchedule = iterator.next();
+ if (retrySchedule.msgId().equals(msgId)) {
+ iterator.remove();
+ toRetryMsgs.remove(msgId);
+ }
+ }
+ }
+ final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId));
+ retrySchedules.add(retrySchedule);
+ toRetryMsgs.add(msgId);
+ LOG.debug("Scheduled. {}", retrySchedule);
+ LOG.trace("Current state {}", retrySchedules);
+ }
+ }
+
+ // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
+ private long nextTime(KafkaSpoutMessageId msgId) {
+ final long currentTimeNanos = System.nanoTime();
+ final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ...
+ ? currentTimeNanos + initialDelay.lengthNanos()
+ : (long) (currentTimeNanos + Math.pow(delayPeriod.lengthNanos, msgId.numFails() - 1));
+ return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaSpoutRetryExponentialBackoff{" +
+ "delay=" + initialDelay +
+ ", ratio=" + delayPeriod +
+ ", maxRetries=" + maxRetries +
+ ", maxRetryDelay=" + maxDelay +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
new file mode 100644
index 0000000..5aab167
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * Represents the logic that manages the retrial of failed tuples.
+ */
+public interface KafkaSpoutRetryService extends Serializable {
+ /**
+ * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled.
+ * @param msgId message to schedule for retrial
+ */
+ void schedule(KafkaSpoutMessageId msgId);
+
+ /**
+ * Removes a message from the list of messages scheduled for retrial
+ * @param msgId message to remove from retrial
+ */
+ boolean remove(KafkaSpoutMessageId msgId);
+
+ /**
+ * Retains all the messages whose {@link TopicPartition} belongs to the specified {@code Collection<TopicPartition>}.
+ * All messages that come from a {@link TopicPartition} NOT existing in the collection will be removed.
+ * This method is useful to cleanup state following partition rebalance.
+ * @param topicPartitions Collection of {@link TopicPartition} for which to keep messages
+ * @return true if at least one message was removed, false otherwise
+ */
+ boolean retainAll(Collection<TopicPartition> topicPartitions);
+
+ /**
+ * @return set of topic partitions that have offsets that are ready to be retried, i.e.,
+ * for which a tuple has failed and has retry time less than current time
+ */
+ Set<TopicPartition> retriableTopicPartitions();
+
+ /**
+ * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried,
+ * i.e is scheduled and has retry time that is less than current time.
+ * @return true if message is ready to be retried, false otherwise
+ */
+ boolean isReady(KafkaSpoutMessageId msgId);
+
+ /**
+ * Checks if a specific failed {@link KafkaSpoutMessageId} is scheduled to be retried.
+ * The message may or may not be ready to be retried yet.
+ * @return true if the message is scheduled to be retried, regardless of being or not ready to be retried.
+ * Returns false is this message is not scheduled for retrial
+ */
+ boolean isScheduled(KafkaSpoutMessageId msgId);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
index 43464a9..064a8bb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
@@ -31,27 +31,31 @@ public class KafkaSpoutStream implements Serializable {
private final String streamId;
private final String topic;
- /** Declare specified outputFields with default stream for the specified topic */
+ /** Represents the specified outputFields and topic with the default stream */
KafkaSpoutStream(Fields outputFields, String topic) {
this(outputFields, Utils.DEFAULT_STREAM_ID, topic);
}
- /** Declare specified outputFields with specified stream for the specified topic */
+ /** Represents the specified outputFields and topic with the specified stream */
KafkaSpoutStream(Fields outputFields, String streamId, String topic) {
+ if (outputFields == null || streamId == null || topic == null) {
+ throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " +
+ "[outputFields=%s, streamId=%s, topic=%s]", outputFields, streamId, topic));
+ }
this.outputFields = outputFields;
this.streamId = streamId;
this.topic = topic;
}
- public Fields getOutputFields() {
+ Fields getOutputFields() {
return outputFields;
}
- public String getStreamId() {
+ String getStreamId() {
return streamId;
}
- public String getTopic() {
+ String getTopic() {
return topic;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
index 30215d1..dc5892e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
@@ -33,7 +33,7 @@ import java.util.List;
import java.util.Map;
/**
- * Represents the output streams associated with each topic, and provides a public API to
+ * Represents the {@link KafkaSpoutStream} associated with each topic, and provides a public API to
* declare output streams and emmit tuples, on the appropriate stream, for all the topics specified.
*/
public class KafkaSpoutStreams implements Serializable {
@@ -48,7 +48,7 @@ public class KafkaSpoutStreams implements Serializable {
/**
* @param topic the topic for which to get output fields
- * @return the output fields declared
+ * @return the declared output fields
*/
public Fields getOutputFields(String topic) {
if (topicToStream.containsKey(topic)) {
@@ -79,7 +79,7 @@ public class KafkaSpoutStreams implements Serializable {
return new ArrayList<>(topicToStream.keySet());
}
- void declareOutputFields(OutputFieldsDeclarer declarer) {
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
for (KafkaSpoutStream stream : topicToStream.values()) {
if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) {
declarer.declareStream(stream.getStreamId(), stream.getOutputFields());
@@ -88,8 +88,8 @@ public class KafkaSpoutStreams implements Serializable {
}
}
- void emit(SpoutOutputCollector collector, KafkaSpoutMessageId messageId) {
- collector.emit(getStreamId(messageId.topic()), messageId.getTuple(), messageId);
+ public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) {
+ collector.emit(getStreamId(messageId.topic()), tuple, messageId);
}
@Override
@@ -103,11 +103,11 @@ public class KafkaSpoutStreams implements Serializable {
private final Map<String, KafkaSpoutStream> topicToStream = new HashMap<>();;
/**
- * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified.
- * All the topics will have the same stream id and output fields.
+ * Creates a {@link KafkaSpoutStream} with the given output Fields for each topic specified.
+ * All topics will have the same stream id and output fields.
*/
public Builder(Fields outputFields, String... topics) {
- this(outputFields, Utils.DEFAULT_STREAM_ID, topics);
+ addStream(outputFields, topics);
}
/**
@@ -115,16 +115,14 @@ public class KafkaSpoutStreams implements Serializable {
* All the topics will have the same stream id and output fields.
*/
public Builder (Fields outputFields, String streamId, String... topics) {
- for (String topic : topics) {
- topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic));
- }
+ addStream(outputFields, streamId, topics);
}
/**
* Adds this stream to the state representing the streams associated with each topic
*/
public Builder(KafkaSpoutStream stream) {
- topicToStream.put(stream.getTopic(), stream);
+ addStream(stream);
}
/**
@@ -139,9 +137,7 @@ public class KafkaSpoutStreams implements Serializable {
* Please refer to javadoc in {@link #Builder(Fields, String...)}
*/
public Builder addStream(Fields outputFields, String... topics) {
- for (String topic : topics) {
- topicToStream.put(topic, new KafkaSpoutStream(outputFields, topic));
- }
+ addStream(outputFields, Utils.DEFAULT_STREAM_ID, topics);
return this;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
index 45aab48..3bb71a8 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
@@ -21,8 +21,38 @@ package org.apache.storm.kafka.spout;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
-public interface KafkaSpoutTupleBuilder<K,V> extends Serializable {
- List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams);
+/**
+ * Implementations of {@link KafkaSpoutTupleBuilder} contain the logic to build tuples from {@link ConsumerRecord}s.
+ * Users must subclass this abstract class to provide their implementation. See also {@link KafkaSpoutTuplesBuilder}
+ */
+public abstract class KafkaSpoutTupleBuilder<K,V> implements Serializable {
+ private List<String> topics;
+
+ /**
+ * @param topics list of topics that use this implementation to build tuples
+ */
+ public KafkaSpoutTupleBuilder(String... topics) {
+ if (topics == null || topics.length == 0) {
+ throw new IllegalArgumentException("Must specify at least one topic. It cannot be null or empty");
+ }
+ this.topics = Arrays.asList(topics);
+ }
+
+ /**
+ * @return list of topics that use this implementation to build tuples
+ */
+ public List<String> getTopics() {
+ return Collections.unmodifiableList(topics);
+ }
+
+ /**
+ * Builds a list of tuples using the ConsumerRecord specified as parameter
+ * @param consumerRecord whose contents are used to build tuples
+ * @return list of tuples
+ */
+ public abstract List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
new file mode 100644
index 0000000..d67c69d
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link KafkaSpoutTuplesBuilder} wraps all the logic that builds tuples from {@link ConsumerRecord}s.
+ * The logic is provided by the user by implementing the appropriate number of {@link KafkaSpoutTupleBuilder} instances
+ */
+public class KafkaSpoutTuplesBuilder<K,V> implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTuplesBuilder.class);
+
+ private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
+
+ private KafkaSpoutTuplesBuilder(Builder<K,V> builder) {
+ this.topicToTupleBuilders = builder.topicToTupleBuilders;
+ LOG.debug("Instantiated {}", this);
+ }
+
+ public static class Builder<K,V> {
+ private List<KafkaSpoutTupleBuilder<K, V>> tupleBuilders;
+ private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
+
+ @SafeVarargs
+ public Builder(KafkaSpoutTupleBuilder<K,V>... tupleBuilders) {
+ if (tupleBuilders == null || tupleBuilders.length == 0) {
+ throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams");
+ }
+
+ this.tupleBuilders = Arrays.asList(tupleBuilders);
+ topicToTupleBuilders = new HashMap<>();
+ }
+
+ public KafkaSpoutTuplesBuilder<K,V> build() {
+ for (KafkaSpoutTupleBuilder<K, V> tupleBuilder : tupleBuilders) {
+ for (String topic : tupleBuilder.getTopics()) {
+ if (!topicToTupleBuilders.containsKey(topic)) {
+ topicToTupleBuilders.put(topic, tupleBuilder);
+ }
+ }
+ }
+ return new KafkaSpoutTuplesBuilder<>(this);
+ }
+ }
+
+ public List<Object>buildTuple(ConsumerRecord<K,V> consumerRecord) {
+ final String topic = consumerRecord.topic();
+ return topicToTupleBuilders.get(topic).buildTuple(consumerRecord);
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaSpoutTuplesBuilder{" +
+ "topicToTupleBuilders=" + topicToTupleBuilders +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
new file mode 100644
index 0000000..7a94a50
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class KafkaSpoutTestBolt extends BaseRichBolt {
+ protected static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTestBolt.class);
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ LOG.debug("input = [" + input + "]");
+ collector.ack(input);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
index 4fcc3ef..0691dd3 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
@@ -22,11 +22,13 @@ import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.spout.KafkaRecordTupleBuilder;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
@@ -35,9 +37,9 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy.STREAM;
public class KafkaSpoutTopologyMain {
private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"};
@@ -80,21 +82,29 @@ public class KafkaSpoutTopologyMain {
public static StormTopology getTopolgyKafkaSpout() {
final TopologyBuilder tp = new TopologyBuilder();
- tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams()), getTupleBuilder()), 1);
- tp.setBolt("kafka_bolt", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
- tp.setBolt("kafka_bolt_1", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
+ tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
+ tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
+ tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
return tp.createTopology();
}
public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
- return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams)
+ return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
.setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(EARLIEST)
- .setPollStrategy(STREAM)
.setMaxUncommittedOffsets(250)
.build();
}
+ private static KafkaSpoutRetryService getRetryService() {
+ return new KafkaSpoutRetryExponentialBackoff(getTimeInterval(500, TimeUnit.MICROSECONDS),
+ TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
+ }
+
+ private static TimeInterval getTimeInterval(long delay, TimeUnit timeUnit) {
+ return new TimeInterval(delay, timeUnit);
+ }
+
public static Map<String,Object> getKafkaConsumerProps() {
Map<String, Object> props = new HashMap<>();
// props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
@@ -105,16 +115,19 @@ public class KafkaSpoutTopologyMain {
return props;
}
- public static KafkaSpoutTupleBuilder<String,String> getTupleBuilder() {
- return new KafkaRecordTupleBuilder<>();
+ public static KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
+ return new KafkaSpoutTuplesBuilder.Builder<>(
+ new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]),
+ new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
+ .build();
}
public static KafkaSpoutStreams getKafkaSpoutStreams() {
final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
final Fields outputFields1 = new Fields("topic", "partition", "offset");
return new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent to test_stream
- .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents topic test2 sent to test_stream
- .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents topic test2 sent to test_stream2
+ .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents of topic test2 sent to test_stream
+ .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents of topic test2 sent to test2_stream
.build();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
deleted file mode 100644
index c9ff9d5..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.test;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class KafkaTestBolt extends BaseRichBolt {
- protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBolt.class);
-
-
- private OutputCollector collector;
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- LOG.debug("input = [" + input + "]");
- collector.ack(input);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
new file mode 100644
index 0000000..ca65177
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class TopicTest2TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
+ /**
+ * @param topics list of topics that use this implementation to build tuples
+ */
+ public TopicTest2TupleBuilder(String... topics) {
+ super(topics);
+ }
+
+ @Override
+ public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+ return new Values(consumerRecord.topic(),
+ consumerRecord.partition(),
+ consumerRecord.offset());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
new file mode 100644
index 0000000..4c55aa1
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class TopicsTest0Test1TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
+ /**
+ * @param topics list of topics that use this implementation to build tuples
+ */
+ public TopicsTest0Test1TupleBuilder(String... topics) {
+ super(topics);
+ }
+
+ @Override
+ public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+ return new Values(consumerRecord.topic(),
+ consumerRecord.partition(),
+ consumerRecord.offset(),
+ consumerRecord.key(),
+ consumerRecord.value());
+ }
+}
\ No newline at end of file