You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/31 21:00:08 UTC
[1/3] storm git commit: Added STORM-822 to Changelog
Repository: storm
Updated Branches:
refs/heads/master 955b44552 -> cd0c0f733
Added STORM-822 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cd0c0f73
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cd0c0f73
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cd0c0f73
Branch: refs/heads/master
Commit: cd0c0f73322fbaf81405be6cdaddeab080bd59b1
Parents: 332afc4
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Mar 31 13:25:15 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 31 13:59:32 2016 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cd0c0f73/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3462d5d..15ff766 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-822: Kafka Spout New Consumer API
* STORM-1663: Stats couldn't handle null worker HB.
* STORM-1665: Worker cannot instantiate kryo
* STORM-1666: Kill from the UI fails silently
[3/3] storm git commit: STORM-822: Kafka Spout New Consumer API
Posted by bo...@apache.org.
STORM-822: Kafka Spout New Consumer API
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d26b984d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d26b984d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d26b984d
Branch: refs/heads/master
Commit: d26b984d74987c95badbdf8a73c74c4304bd4ec9
Parents: 955b445
Author: Hugo Louro <hm...@gmail.com>
Authored: Mon Dec 14 10:16:42 2015 -0800
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 31 13:59:32 2016 -0500
----------------------------------------------------------------------
examples/storm-starter/pom.xml | 16 +-
external/flux/flux-examples/pom.xml | 13 +-
external/flux/pom.xml | 13 +-
external/sql/storm-sql-kafka/pom.xml | 16 +-
external/storm-kafka-client/README.md | 9 +
external/storm-kafka-client/pom.xml | 86 ++++
.../kafka/spout/KafkaRecordTupleBuilder.java | 44 ++
.../apache/storm/kafka/spout/KafkaSpout.java | 503 +++++++++++++++++++
.../storm/kafka/spout/KafkaSpoutConfig.java | 298 +++++++++++
.../storm/kafka/spout/KafkaSpoutMessageId.java | 111 ++++
.../storm/kafka/spout/KafkaSpoutStream.java | 66 +++
.../storm/kafka/spout/KafkaSpoutStreams.java | 162 ++++++
.../kafka/spout/KafkaSpoutTupleBuilder.java | 28 ++
.../spout/test/KafkaSpoutTopologyMain.java | 120 +++++
.../storm/kafka/spout/test/KafkaTestBolt.java | 52 ++
external/storm-kafka/pom.xml | 16 +-
external/storm-solr/pom.xml | 8 +-
pom.xml | 57 ++-
storm-dist/binary/src/main/assembly/binary.xml | 14 +
19 files changed, 1544 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 0b5fd97..112f1e8 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -158,26 +158,12 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.2.1</version>
- <!-- use provided scope, so users can pull in whichever scala version they choose -->
+ <artifactId>${kafka.artifact.id}</artifactId>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
- <version>0.8.2.1</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/flux/flux-examples/pom.xml
----------------------------------------------------------------------
diff --git a/external/flux/flux-examples/pom.xml b/external/flux/flux-examples/pom.xml
index 28d7239..48cc151 100644
--- a/external/flux/flux-examples/pom.xml
+++ b/external/flux/flux-examples/pom.xml
@@ -95,18 +95,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.1.1</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
+ <artifactId>${kafka.artifact.id}</artifactId>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/flux/pom.xml
----------------------------------------------------------------------
diff --git a/external/flux/pom.xml b/external/flux/pom.xml
index 1fd1683..56d9bab 100644
--- a/external/flux/pom.xml
+++ b/external/flux/pom.xml
@@ -78,19 +78,8 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.1.1</version>
+ <artifactId>${kafka.artifact.id}</artifactId>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/sql/storm-sql-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/pom.xml b/external/sql/storm-sql-kafka/pom.xml
index 450611e..0642d17 100644
--- a/external/sql/storm-sql-kafka/pom.xml
+++ b/external/sql/storm-sql-kafka/pom.xml
@@ -63,26 +63,12 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.2.1</version>
- <!-- use provided scope, so users can pull in whichever scala version they choose -->
+ <artifactId>${kafka.artifact.id}</artifactId>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
- <version>0.8.2.1</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/README.md b/external/storm-kafka-client/README.md
new file mode 100644
index 0000000..8ac15f5
--- /dev/null
+++ b/external/storm-kafka-client/README.md
@@ -0,0 +1,9 @@
+#Storm Kafka Spout New Consumer API
+
+This patch is still under development and it comes with no warranties at this moment.
+
+It has not been thoroughly tested, and therefore there may be some bugs and it is not ready for production.
+
+The documentation will be uploaded soon.
+
+To see how to use the new Kafka Spout, please refer to the example under tests. Thank you!
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
new file mode 100644
index 0000000..6c82b6a
--- /dev/null
+++ b/external/storm-kafka-client/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-kafka-client</artifactId>
+ <name>storm-kafka-client</name>
+
+ <packaging>jar</packaging>
+
+ <developers>
+ <developer>
+ <id>hmcl</id>
+ <name>Hugo Louro</name>
+ <email>hmclouro@gmail.com</email>
+ </developer>
+ </developers>
+
+ <dependencies>
+ <!--parent module dependency-->
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!--kafka libraries-->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <!--test dependencies -->
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.5</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
new file mode 100644
index 0000000..4d67632
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class KafkaRecordTupleBuilder<K, V> implements KafkaSpoutTupleBuilder<K, V> {
+ @Override
+ public List<Object> buildTuple(final ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams) {
+ final Fields outputFields = kafkaSpoutStreams.getOutputFields(consumerRecord.topic());
+ if (outputFields != null) {
+ if (outputFields.size() == 3) {
+ return new Values(consumerRecord.topic(),
+ consumerRecord.partition(),
+ consumerRecord.offset());
+ } else if (outputFields.size() == 5) {
+ return new Values(consumerRecord.topic(),
+ consumerRecord.partition(),
+ consumerRecord.offset(),
+ consumerRecord.key(),
+ consumerRecord.value());
+ }
+ }
+ throw new RuntimeException("Failed to build tuple. " + consumerRecord + " " + kafkaSpoutStreams);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
new file mode 100644
index 0000000..9a49ee8
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+
+public class KafkaSpout<K, V> extends BaseRichSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
+ private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
+
+ // Storm
+ protected SpoutOutputCollector collector;
+
+ // Kafka
+ private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+ private transient KafkaConsumer<K, V> kafkaConsumer;
+ private transient boolean consumerAutoCommitMode;
+
+
+ // Bookkeeping
+ private KafkaSpoutStreams kafkaSpoutStreams;
+ private KafkaSpoutTupleBuilder<K, V> tupleBuilder;
+ private transient Timer commitTimer; // timer == null for auto commit mode
+ private transient Timer logTimer;
+ private transient Map<TopicPartition, OffsetEntry> acked; // emitted tuples that were successfully acked. These tuples will be committed periodically when the timer expires, on consumer rebalance, or on close/deactivate
+ private transient int maxRetries; // Max number of times a tuple is retried
+ private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process.
+ // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
+ private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed
+ private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
+ private transient PollStrategy pollStrategy;
+
+
+ public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaSpoutTupleBuilder<K, V> tupleBuilder) {
+ this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration
+ this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
+ this.tupleBuilder = tupleBuilder;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ initialized = false;
+
+ // Spout internals
+ this.collector = collector;
+ maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
+ numUncommittedOffsets = 0;
+ logTimer = new Timer(500, Math.min(1000, kafkaSpoutConfig.getOffsetsCommitPeriodMs()/2), TimeUnit.MILLISECONDS);
+
+ // Offset management
+ firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
+ pollStrategy = kafkaSpoutConfig.getPollStrategy();
+ consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
+
+ if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually
+ commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
+ acked = new HashMap<>();
+ }
+
+ LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
+ }
+
+ // =========== Consumer Rebalance Listener - On the same thread as the caller ===========
+
+ private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ LOG.debug("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
+ kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+ if (!consumerAutoCommitMode && initialized) {
+ initialized = false;
+ commitOffsetsForAckedTuples();
+ }
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ LOG.debug("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
+ kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+
+ initialize(partitions);
+ }
+
+ private void initialize(Collection<TopicPartition> partitions) {
+ if (!consumerAutoCommitMode) {
+ acked.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout
+ }
+
+ for (TopicPartition tp : partitions) {
+ final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
+ final long fetchOffset = doSeek(tp, committedOffset);
+ setAcked(tp, fetchOffset);
+ }
+ initialized = true;
+ LOG.debug("Initialization complete");
+ }
+
+ /**
+ * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset
+ */
+ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) {
+ long fetchOffset;
+ if (committedOffset != null) { // offset was committed for this TopicPartition
+ if (firstPollOffsetStrategy.equals(EARLIEST)) {
+ kafkaConsumer.seekToBeginning(tp);
+ fetchOffset = kafkaConsumer.position(tp);
+ } else if (firstPollOffsetStrategy.equals(LATEST)) {
+ kafkaConsumer.seekToEnd(tp);
+ fetchOffset = kafkaConsumer.position(tp);
+ } else {
+ // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset.
+ fetchOffset = committedOffset.offset() + 1;
+ kafkaConsumer.seek(tp, fetchOffset);
+ }
+ } else { // no commits have ever been done, so start at the beginning or end depending on the strategy
+ if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
+ kafkaConsumer.seekToBeginning(tp);
+ } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
+ kafkaConsumer.seekToEnd(tp);
+ }
+ fetchOffset = kafkaConsumer.position(tp);
+ }
+ return fetchOffset;
+ }
+ }
+
+ private void setAcked(TopicPartition tp, long fetchOffset) {
+ // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
+ if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
+ acked.put(tp, new OffsetEntry(tp, fetchOffset));
+ }
+ }
+
+ // ======== Next Tuple =======
+
+ @Override
+ public void nextTuple() {
+ if (initialized) {
+ if (commit()) {
+ commitOffsetsForAckedTuples();
+ } else if (poll()) {
+ emitTuples(pollKafkaBroker());
+ } else if (logTimer.isExpiredResetOnTrue()) { // to limit the number of messages that get printed.
+ log();
+ }
+ } else {
+ LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
+ }
+ }
+
+ private void log() {
+ switch(pollStrategy) {
+ case STREAM:
+ LOG.trace("Reached the maximum number number of uncommitted records [{}]. " +
+ "No more polls will occur until a sequence of commits sets the count under the [{}] threshold ",
+ numUncommittedOffsets, kafkaSpoutConfig.getMaxUncommittedOffsets());
+ break;
+ case BATCH:
+ LOG.trace("No more polls will occur until the last batch completes. [{}] emitted tuples pending", numUncommittedOffsets);
+ break;
+ default:
+ throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy);
+ }
+
+ }
+
+ // always poll in auto commit mode because no state is kept and therefore there is no need to set an upper limit in memory
+ private boolean poll() {
+ switch(pollStrategy) {
+ case STREAM:
+ return consumerAutoCommitMode || numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets();
+ case BATCH:
+ return consumerAutoCommitMode || numUncommittedOffsets <= 0;
+ default:
+ throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy);
+ }
+ }
+
+ private boolean commit() {
+ return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode
+ }
+
+ private ConsumerRecords<K, V> pollKafkaBroker() {
+ final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
+ final int numPolledRecords = consumerRecords.count();
+ numUncommittedOffsets+= numPolledRecords;
+ LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets);
+ return consumerRecords;
+ }
+
+ private void emitTuples(ConsumerRecords<K, V> consumerRecords) {
+ for (TopicPartition tp : consumerRecords.partitions()) {
+ final Iterable<ConsumerRecord<K, V>> records = consumerRecords.records(tp.topic());
+
+ for (final ConsumerRecord<K, V> record : records) {
+ final List<Object> tuple = tupleBuilder.buildTuple(record, kafkaSpoutStreams);
+ final KafkaSpoutMessageId messageId = new KafkaSpoutMessageId(record, tuple);
+
+ kafkaSpoutStreams.emit(collector, messageId); // emits one tuple per record
+ LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+ }
+ }
+ }
+
+ private void commitOffsetsForAckedTuples() {
+ // Find offsets that are ready to be committed for every topic partition
+ final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) {
+ final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset();
+ if (nextCommitOffset != null) {
+ nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
+ }
+ }
+
+ // Commit offsets that are ready to be committed for every topic partition
+ if (!nextCommitOffsets.isEmpty()) {
+ kafkaConsumer.commitSync(nextCommitOffsets);
+ LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
+ // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition
+ // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop
+ for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) {
+ final OffsetEntry offsetEntry = tpOffset.getValue();
+ offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey()));
+ }
+ } else {
+ LOG.trace("No offsets to commit. {}", this);
+ }
+ }
+
+ // ======== Ack =======
+
+ @Override
+ public void ack(Object messageId) {
+ if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically
+ final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+ acked.get(msgId.getTopicPartition()).add(msgId);
+ LOG.trace("Acked message [{}]. Messages acked and pending commit [{}]", msgId, acked);
+ }
+ }
+
+ // ======== Fail =======
+
+ @Override
+ public void fail(Object messageId) {
+ final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+ if (msgId.numFails() < maxRetries) {
+ msgId.incrementNumFails();
+ kafkaSpoutStreams.emit(collector, msgId);
+ LOG.trace("Retried tuple with message id [{}]", msgId);
+ } else { // limit to max number of retries
+ LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
+ ack(msgId);
+ }
+ }
+
+ // ======== Activate / Deactivate / Close / Declare Outputs =======
+
+ @Override
+ public void activate() {
+ subscribeKafkaConsumer();
+ }
+
+ private void subscribeKafkaConsumer() {
+ kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
+ kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
+ kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(), new KafkaSpoutConsumerRebalanceListener());
+ // Initial poll to get the consumer registration process going.
+ // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
+ kafkaConsumer.poll(0);
+ }
+
+ @Override
+ public void deactivate() {
+ shutdown();
+ }
+
+ @Override
+ public void close() {
+ shutdown();
+ }
+
+ private void shutdown() {
+ try {
+ kafkaConsumer.wakeup();
+ if (!consumerAutoCommitMode) {
+ commitOffsetsForAckedTuples();
+ }
+ } finally {
+ //remove resources
+ kafkaConsumer.close();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ kafkaSpoutStreams.declareOutputFields(declarer);
+ }
+
+ @Override
+ public String toString() {
+ return "{acked=" + acked + "} ";
+ }
+
+ // ======= Offsets Commit Management ==========
+
+ private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> {
+ public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
+ return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1;
+ }
+ }
+
+ /**
+ * This class is not thread safe
+ */
+ private class OffsetEntry {
+ private final TopicPartition tp;
+ private final long initialFetchOffset; /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
+ * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
+ private long committedOffset; // last offset committed to Kafka. Initially it is set to fetchOffset - 1
+ private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR); // acked messages sorted by ascending order of offset
+
+ public OffsetEntry(TopicPartition tp, long initialFetchOffset) {
+ this.tp = tp;
+ this.initialFetchOffset = initialFetchOffset;
+ this.committedOffset = initialFetchOffset - 1;
+ LOG.debug("Created OffsetEntry. {}", this);
+ }
+
+ public void add(KafkaSpoutMessageId msgId) { // O(Log N)
+ ackedMsgs.add(msgId);
+ }
+
+ /**
+ * @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit.
+ */
+ public OffsetAndMetadata findNextCommitOffset() {
+ boolean found = false;
+ long currOffset;
+ long nextCommitOffset = committedOffset;
+ KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata
+
+ for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap
+ if ((currOffset = currAckedMsg.offset()) == initialFetchOffset || currOffset == nextCommitOffset + 1) { // found the next offset to commit
+ found = true;
+ nextCommitMsg = currAckedMsg;
+ nextCommitOffset = currOffset;
+ LOG.trace("Found offset to commit [{}]. {}", currOffset, this);
+ } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search
+ LOG.debug("Non continuous offset found [{}]. It will be processed in a subsequent batch. {}", currOffset, this);
+ break;
+ } else {
+ LOG.debug("Unexpected offset found [{}]. {}", currOffset, this);
+ break;
+ }
+ }
+
+ OffsetAndMetadata nextCommitOffsetAndMetadata = null;
+ if (found) {
+ nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
+ LOG.debug("Offset to be committed next: [{}] {}", nextCommitOffsetAndMetadata.offset(), this);
+ } else {
+ LOG.debug("No offsets ready to commit. {}", this);
+ }
+ return nextCommitOffsetAndMetadata;
+ }
+
+ /**
+ * Marks an offset has committed. This method has side effects - it sets the internal state in such a way that future
+ * calls to {@link #findNextCommitOffset()} will return offsets greater than the offset specified, if any.
+ *
+ * @param committedOffset offset to be marked as committed
+ */
+ public void commit(OffsetAndMetadata committedOffset) {
+ if (committedOffset != null) {
+ final long numCommittedOffsets = committedOffset.offset() - this.committedOffset;
+ this.committedOffset = committedOffset.offset();
+ for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext(); ) {
+ if (iterator.next().offset() <= committedOffset.offset()) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+ numUncommittedOffsets-= numCommittedOffsets;
+ }
+ LOG.trace("Object state after update: {}, numUncommittedOffsets [{}]", this, numUncommittedOffsets);
+ }
+
+ public boolean isEmpty() {
+ return ackedMsgs.isEmpty();
+ }
+
+ @Override
+ public String toString() {
+ return "OffsetEntry{" +
+ "topic-partition=" + tp +
+ ", fetchOffset=" + initialFetchOffset +
+ ", committedOffset=" + committedOffset +
+ ", ackedMsgs=" + ackedMsgs +
+ '}';
+ }
+ }
+
+ // =========== Timer ===========
+
+ private class Timer {
+ private final long delay;
+ private final long period;
+ private final TimeUnit timeUnit;
+ private final long periodNanos;
+ private long start;
+
+ /**
+ * Creates a class that mimics a single threaded timer that expires periodically. If a call to {@link
+ * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was initiated or reset, this method returns
+ * true. Each time the method returns true the counter is reset. The timer starts with the specified time delay.
+ *
+ * @param delay the initial delay before the timer starts
+ * @param period the period between calls {@link #isExpiredResetOnTrue()}
+ * @param timeUnit the time unit of delay and period
+ */
+ public Timer(long delay, long period, TimeUnit timeUnit) {
+ this.delay = delay;
+ this.period = period;
+ this.timeUnit = timeUnit;
+
+ periodNanos = timeUnit.toNanos(period);
+ start = System.nanoTime() + timeUnit.toNanos(delay);
+ }
+
+ public long period() {
+ return period;
+ }
+
+ public long delay() {
+ return delay;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
+ /**
+ * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the
+ * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset
+ * (re-initiated) and a new cycle will start.
+ *
+ * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false
+ * otherwise.
+ */
+ public boolean isExpiredResetOnTrue() {
+ final boolean expired = System.nanoTime() - start > periodNanos;
+ if (expired) {
+ start = System.nanoTime();
+ }
+ return expired;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
new file mode 100644
index 0000000..d969f1f
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
+ */
+public class KafkaSpoutConfig<K, V> implements Serializable {
+ public static final long DEFAULT_POLL_TIMEOUT_MS = 2_000; // 2s
+ public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 15_000; // 15s
+ public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // Retry forever
+ public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000; // 10,000 records
+
+ // Kafka property names
+ public interface Consumer {
+ String GROUP_ID = "group.id";
+ String BOOTSTRAP_SERVERS = "bootstrap.servers";
+ String ENABLE_AUTO_COMMIT = "enable.auto.commit";
+ String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
+ String KEY_DESERIALIZER = "key.deserializer";
+ String VALUE_DESERIALIZER = "value.deserializer";
+ }
+
+ /**
+ * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will
+ * affect the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
+ * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. <br/>
+ * <ul>
+ * <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits</li>
+ * <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits</li>
+ * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any.
+ * If no offset has been committed, it behaves as EARLIEST.</li>
+ * <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any.
+ * If no offset has been committed, it behaves as LATEST.</li>
+ * </ul>
+ * */
+ public enum FirstPollOffsetStrategy {
+ EARLIEST,
+ LATEST,
+ UNCOMMITTED_EARLIEST,
+ UNCOMMITTED_LATEST }
+
+ /**
+ * Defines when to poll the next batch of records from Kafka. The choice of this parameter will affect throughput and the memory
+ * footprint of the Kafka spout. The allowed values are STREAM and BATCH. STREAM will likely have higher throughput and use more memory
+ * (it stores in memory the entire KafkaRecord,including data). BATCH will likely have less throughput but also use less memory.
+ * The BATCH behavior is similar to the behavior of the previous Kafka Spout. De default value is STREAM.
+ * <ul>
+ * <li>STREAM Every periodic call to nextTuple polls a new batch of records from Kafka as long as the maxUncommittedOffsets
+ * threshold has not yet been reached. When the threshold his reached, no more records are polled until enough offsets have been
+ * committed, such that the number of pending offsets is less than maxUncommittedOffsets. See {@link Builder#setMaxUncommittedOffsets(int)}
+ * </li>
+ * <li>BATCH Only polls a new batch of records from kafka once all the records that came in the previous poll have been acked.</li>
+ * </ul>
+ */
+ public enum PollStrategy {
+ STREAM,
+ BATCH
+ }
+
+ // Kafka consumer configuration
+ private final Map<String, Object> kafkaProps;
+ private final Deserializer<K> keyDeserializer;
+ private final Deserializer<V> valueDeserializer;
+ private final long pollTimeoutMs;
+
+ // Kafka spout configuration
+ private final long offsetCommitPeriodMs;
+ private final int maxRetries;
+ private final int maxUncommittedOffsets;
+ private final FirstPollOffsetStrategy firstPollOffsetStrategy;
+ private final PollStrategy pollStrategy;
+ private final KafkaSpoutStreams kafkaSpoutStreams;
+
+ private KafkaSpoutConfig(Builder<K,V> builder) {
+ this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
+ this.keyDeserializer = builder.keyDeserializer;
+ this.valueDeserializer = builder.valueDeserializer;
+ this.pollTimeoutMs = builder.pollTimeoutMs;
+ this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
+ this.maxRetries = builder.maxRetries;
+ this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+ this.pollStrategy = builder.pollStrategy;
+ this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
+ this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+ }
+
+ private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
+ // set defaults for properties not specified
+ if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
+ kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+ }
+ return kafkaProps;
+ }
+
+ public static class Builder<K,V> {
+ private Map<String, Object> kafkaProps;
+ private Deserializer<K> keyDeserializer;
+ private Deserializer<V> valueDeserializer;
+ private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
+ private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
+ private int maxRetries = DEFAULT_MAX_RETRIES;
+ private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+ private KafkaSpoutStreams kafkaSpoutStreams;
+ private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
+ private PollStrategy pollStrategy = PollStrategy.STREAM;
+
+ /***
+ * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
+ * The optional configuration can be specified using the set methods of this builder
+ * @param kafkaProps properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a>
+ * @param kafkaSpoutStreams streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream.
+ */
+ public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams) {
+ if (kafkaProps == null || kafkaProps.isEmpty()) {
+ throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required. " + kafkaProps);
+ }
+
+ if (kafkaSpoutStreams == null) {
+ throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit in the same stream.");
+ }
+ this.kafkaProps = kafkaProps;
+ this.kafkaSpoutStreams = kafkaSpoutStreams;
+ }
+
+ /**
+ * Specifying this key deserializer overrides the property key.deserializer
+ */
+ public Builder<K,V> setKeyDeserializer(Deserializer<K> keyDeserializer) {
+ this.keyDeserializer = keyDeserializer;
+ return this;
+ }
+
+ /**
+ * Specifying this value deserializer overrides the property value.deserializer
+ */
+ public Builder<K,V> setValueDeserializer(Deserializer<V> valueDeserializer) {
+ this.valueDeserializer = valueDeserializer;
+ return this;
+ }
+
+ /**
+ * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s
+ * @param pollTimeoutMs time in ms
+ */
+ public Builder<K,V> setPollTimeoutMs(long pollTimeoutMs) {
+ this.pollTimeoutMs = pollTimeoutMs;
+ return this;
+ }
+
+ /**
+ * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
+ * @param offsetCommitPeriodMs time in ms
+ */
+ public Builder<K,V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
+ this.offsetCommitPeriodMs = offsetCommitPeriodMs;
+ return this;
+ }
+
+ /**
+ * Defines the max number of retrials in case of tuple failure. The default is to retry forever, which means that
+ * no new records are committed until the previous polled records have been acked. This guarantees at once delivery of
+ * all the previously polled records.
+ * By specifying a finite value for maxRetries, the user decides to sacrifice guarantee of delivery for the previous
+ * polled records in favor of processing more records.
+ * @param maxRetries max number of retrials
+ */
+ public Builder<K,V> setMaxRetries(int maxRetries) {
+ this.maxRetries = maxRetries;
+ return this;
+ }
+
+ /**
+ * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
+ * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
+ * of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
+ * @param maxUncommittedOffsets max number of records that can be be pending commit
+ */
+ public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
+ this.maxUncommittedOffsets = maxUncommittedOffsets;
+ return this;
+ }
+
+ /**
+ * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start.
+ * Please refer to to the documentation in {@link FirstPollOffsetStrategy}
+ * @param firstPollOffsetStrategy Offset used by Kafka spout first poll
+ * */
+ public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
+ this.firstPollOffsetStrategy = firstPollOffsetStrategy;
+ return this;
+ }
+
+ /**
+ * Sets the strategy used by the the Kafka spout to decide when to poll the next batch of records from Kafka.
+ * Please refer to to the documentation in {@link PollStrategy}
+ * @param pollStrategy strategy used to decide when to poll
+ * */
+ public Builder<K, V> setPollStrategy(PollStrategy pollStrategy) {
+ this.pollStrategy = pollStrategy;
+ return this;
+ }
+
+ public KafkaSpoutConfig<K,V> build() {
+ return new KafkaSpoutConfig<>(this);
+ }
+ }
+
+ public Map<String, Object> getKafkaProps() {
+ return kafkaProps;
+ }
+
+ public Deserializer<K> getKeyDeserializer() {
+ return keyDeserializer;
+ }
+
+ public Deserializer<V> getValueDeserializer() {
+ return valueDeserializer;
+ }
+
+ public long getPollTimeoutMs() {
+ return pollTimeoutMs;
+ }
+
+ public long getOffsetsCommitPeriodMs() {
+ return offsetCommitPeriodMs;
+ }
+
+ public boolean isConsumerAutoCommitMode() {
+ return kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT) == null // default is true
+ || Boolean.valueOf((String)kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT));
+ }
+
+ public String getConsumerGroupId() {
+ return (String) kafkaProps.get(Consumer.GROUP_ID);
+ }
+
+ public List<String> getSubscribedTopics() {
+ return new ArrayList<>(kafkaSpoutStreams.getTopics());
+ }
+
+ public int getMaxTupleRetries() {
+ return maxRetries;
+ }
+
+ public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
+ return firstPollOffsetStrategy;
+ }
+
+ public KafkaSpoutStreams getKafkaSpoutStreams() {
+ return kafkaSpoutStreams;
+ }
+
+ public int getMaxUncommittedOffsets() {
+ return maxUncommittedOffsets;
+ }
+
+ public PollStrategy getPollStrategy() {
+ return pollStrategy;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaSpoutConfig{" +
+ "kafkaProps=" + kafkaProps +
+ ", keyDeserializer=" + keyDeserializer +
+ ", valueDeserializer=" + valueDeserializer +
+ ", topics=" + getSubscribedTopics() +
+ ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
+ ", pollTimeoutMs=" + pollTimeoutMs +
+ ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
+ ", maxRetries=" + maxRetries +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
new file mode 100644
index 0000000..0a6b126
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collections;
+import java.util.List;
+
+public class KafkaSpoutMessageId {
+ private transient TopicPartition topicPart;
+ private transient long offset;
+ private transient List<Object> tuple;
+ private transient int numFails = 0;
+
+ public KafkaSpoutMessageId(ConsumerRecord consumerRecord, List<Object> tuple) {
+ this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), tuple);
+ }
+
+ public KafkaSpoutMessageId(TopicPartition topicPart, long offset, List<Object> tuple) {
+ this.topicPart = topicPart;
+ this.offset = offset;
+ this.tuple = tuple;
+ }
+
+ public int partition() {
+ return topicPart.partition();
+ }
+
+ public String topic() {
+ return topicPart.topic();
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ public int numFails() {
+ return numFails;
+ }
+
+ public void incrementNumFails() {
+ ++numFails;
+ }
+
+ public TopicPartition getTopicPartition() {
+ return topicPart;
+ }
+
+ public List<Object> getTuple() {
+ return Collections.unmodifiableList(tuple);
+ }
+
+ public String getMetadata(Thread currThread) {
+ return "{" +
+ "topic-partition=" + topicPart +
+ ", offset=" + offset +
+ ", numFails=" + numFails +
+ ", thread='" + currThread.getName() + "'" +
+ '}';
+ }
+
+ @Override
+ public String toString() {
+ return "{" +
+ "topic-partition=" + topicPart +
+ ", offset=" + offset +
+ ", numFails=" + numFails +
+ ", tuple=" + tuple +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KafkaSpoutMessageId messageId = (KafkaSpoutMessageId) o;
+ if (offset != messageId.offset) {
+ return false;
+ }
+ return topicPart.equals(messageId.topicPart);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = topicPart.hashCode();
+ result = 31 * result + (int) (offset ^ (offset >>> 32));
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
new file mode 100644
index 0000000..43464a9
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+
+/**
+ * Represents the stream and output fields used by a topic
+ */
+public class KafkaSpoutStream implements Serializable {
+ private final Fields outputFields;
+ private final String streamId;
+ private final String topic;
+
+ /** Declare specified outputFields with default stream for the specified topic */
+ KafkaSpoutStream(Fields outputFields, String topic) {
+ this(outputFields, Utils.DEFAULT_STREAM_ID, topic);
+ }
+
+ /** Declare specified outputFields with specified stream for the specified topic */
+ KafkaSpoutStream(Fields outputFields, String streamId, String topic) {
+ this.outputFields = outputFields;
+ this.streamId = streamId;
+ this.topic = topic;
+ }
+
+ public Fields getOutputFields() {
+ return outputFields;
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaSpoutStream{" +
+ "outputFields=" + outputFields +
+ ", streamId='" + streamId + '\'' +
+ ", topic='" + topic + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
new file mode 100644
index 0000000..30215d1
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.OutputFieldsGetter;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+ protected static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStreams.class);
+
+ private final Map<String, KafkaSpoutStream> topicToStream;
+
+ private KafkaSpoutStreams(Builder builder) {
+ this.topicToStream = builder.topicToStream;
+ LOG.debug("Built {}", this);
+ }
+
+ /**
+ * @param topic the topic for which to get output fields
+ * @return the output fields declared
+ */
+ public Fields getOutputFields(String topic) {
+ if (topicToStream.containsKey(topic)) {
+ final Fields outputFields = topicToStream.get(topic).getOutputFields();
+ LOG.trace("Topic [{}] has output fields [{}]", topic, outputFields);
+ return outputFields;
+ }
+ throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic);
+ }
+
+ /**
+ * @param topic the topic to for which to get the stream id
+ * @return the id of the stream to where the tuples are emitted
+ */
+ public String getStreamId(String topic) {
+ if (topicToStream.containsKey(topic)) {
+ final String streamId = topicToStream.get(topic).getStreamId();
+ LOG.trace("Topic [{}] emitting in stream [{}]", topic, streamId);
+ return streamId;
+ }
+ throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic);
+ }
+
+ /**
+ * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream}
+ */
+ public List<String> getTopics() {
+ return new ArrayList<>(topicToStream.keySet());
+ }
+
+ void declareOutputFields(OutputFieldsDeclarer declarer) {
+ for (KafkaSpoutStream stream : topicToStream.values()) {
+ if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) {
+ declarer.declareStream(stream.getStreamId(), stream.getOutputFields());
+ LOG.debug("Declared " + stream);
+ }
+ }
+ }
+
+ void emit(SpoutOutputCollector collector, KafkaSpoutMessageId messageId) {
+ collector.emit(getStreamId(messageId.topic()), messageId.getTuple(), messageId);
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaSpoutStreams{" +
+ "topicToStream=" + topicToStream +
+ '}';
+ }
+
+ public static class Builder {
+ private final Map<String, KafkaSpoutStream> topicToStream = new HashMap<>();;
+
+ /**
+ * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified.
+ * All the topics will have the same stream id and output fields.
+ */
+ public Builder(Fields outputFields, String... topics) {
+ this(outputFields, Utils.DEFAULT_STREAM_ID, topics);
+ }
+
+ /**
+ * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified.
+ * All the topics will have the same stream id and output fields.
+ */
+ public Builder (Fields outputFields, String streamId, String... topics) {
+ for (String topic : topics) {
+ topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic));
+ }
+ }
+
+ /**
+ * Adds this stream to the state representing the streams associated with each topic
+ */
+ public Builder(KafkaSpoutStream stream) {
+ topicToStream.put(stream.getTopic(), stream);
+ }
+
+ /**
+ * Adds this stream to the state representing the streams associated with each topic
+ */
+ public Builder addStream(KafkaSpoutStream stream) {
+ topicToStream.put(stream.getTopic(), stream);
+ return this;
+ }
+
+ /**
+ * Please refer to javadoc in {@link #Builder(Fields, String...)}
+ */
+ public Builder addStream(Fields outputFields, String... topics) {
+ for (String topic : topics) {
+ topicToStream.put(topic, new KafkaSpoutStream(outputFields, topic));
+ }
+ return this;
+ }
+
+ /**
+ * Please refer to javadoc in {@link #Builder(Fields, String, String...)}
+ */
+ public Builder addStream(Fields outputFields, String streamId, String... topics) {
+ for (String topic : topics) {
+ topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic));
+ }
+ return this;
+ }
+
+ public KafkaSpoutStreams build() {
+ return new KafkaSpoutStreams(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
new file mode 100644
index 0000000..45aab48
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface KafkaSpoutTupleBuilder<K,V> extends Serializable {
+ List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
new file mode 100644
index 0000000..4fcc3ef
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.KafkaRecordTupleBuilder;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutStreams;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy.STREAM;
+
+public class KafkaSpoutTopologyMain {
+ private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"};
+ private static final String[] TOPICS = new String[]{"test","test1","test2"};
+
+
+ public static void main(String[] args) throws Exception {
+ if (args.length == 0) {
+ submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig());
+ } else {
+ submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), getConfig());
+ }
+ }
+
+ protected static void submitTopologyLocalCluster(StormTopology topology, Config config) throws InterruptedException {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", config, topology);
+ stopWaitingForInput();
+ }
+
+ protected static void submitTopologyRemoteCluster(String arg, StormTopology topology, Config config) throws Exception {
+ StormSubmitter.submitTopology(arg, config, topology);
+ }
+
+ private static void stopWaitingForInput() {
+ try {
+ System.out.println("PRESS ENTER TO STOP");
+ new BufferedReader(new InputStreamReader(System.in)).readLine();
+ System.exit(0);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected static Config getConfig() {
+ Config config = new Config();
+ config.setDebug(true);
+ return config;
+ }
+
+ public static StormTopology getTopolgyKafkaSpout() {
+ final TopologyBuilder tp = new TopologyBuilder();
+ tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams()), getTupleBuilder()), 1);
+ tp.setBolt("kafka_bolt", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
+ tp.setBolt("kafka_bolt_1", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
+ return tp.createTopology();
+ }
+
+ public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
+ return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams)
+ .setOffsetCommitPeriodMs(10_000)
+ .setFirstPollOffsetStrategy(EARLIEST)
+ .setPollStrategy(STREAM)
+ .setMaxUncommittedOffsets(250)
+ .build();
+ }
+
+ public static Map<String,Object> getKafkaConsumerProps() {
+ Map<String, Object> props = new HashMap<>();
+// props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
+ props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092");
+ props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
+ props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+ return props;
+ }
+
+ public static KafkaSpoutTupleBuilder<String,String> getTupleBuilder() {
+ return new KafkaRecordTupleBuilder<>();
+ }
+
+ public static KafkaSpoutStreams getKafkaSpoutStreams() {
+ final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
+ final Fields outputFields1 = new Fields("topic", "partition", "offset");
+ return new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent to test_stream
+ .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents topic test2 sent to test_stream
+ .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents topic test2 sent to test_stream2
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
new file mode 100644
index 0000000..c9ff9d5
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class KafkaTestBolt extends BaseRichBolt {
+ protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBolt.class);
+
+
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ LOG.debug("input = [" + input + "]");
+ collector.ack(input);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 43b7796..763c15f 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -118,26 +118,12 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.2.1</version>
- <!-- use provided scope, so users can pull in whichever scala version they choose -->
+ <artifactId>${kafka.artifact.id}</artifactId>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
- <version>0.8.2.1</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-solr/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-solr/pom.xml b/external/storm-solr/pom.xml
index d093ae8..ba79ddc 100644
--- a/external/storm-solr/pom.xml
+++ b/external/storm-solr/pom.xml
@@ -31,10 +31,10 @@
<developers>
<developer>
- <id>Hugo-Louro</id>
- <name>Hugo Louro</name>
- <email>hmclouro@gmail.com</email>
- </developer>
+ <id>hmcl</id>
+ <name>Hugo Louro</name>
+ <email>hmclouro@gmail.com</email>
+ </developer>
</developers>
<dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 12e5a9f..1a899b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,7 +82,7 @@
<roles>
<role>Committer</role>
</roles>
- <timezone />
+ <timezone/>
</developer>
<developer>
<id>afeng</id>
@@ -109,7 +109,7 @@
<roles>
<role>Committer</role>
</roles>
- <timezone />
+ <timezone/>
</developer>
<developer>
<id>jjackson</id>
@@ -249,6 +249,8 @@
<calcite.version>1.4.0-incubating</calcite.version>
<jackson.version>2.6.3</jackson.version>
<maven-surefire.version>2.18.1</maven-surefire.version>
+ <kafka.version>0.9.0.1</kafka.version>
+ <kafka.artifact.id>kafka_2.11</kafka.artifact.id>
<!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile -->
<java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
@@ -282,6 +284,7 @@
<module>external/storm-mongodb</module>
<module>examples/storm-starter</module>
<module>storm-clojure</module>
+ <module>external/storm-kafka-client</module>
</modules>
<dependencies>
@@ -673,14 +676,14 @@
<version>${ring-json.version}</version>
</dependency>
<dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlet</artifactId>
- <version>${jetty.version}</version>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>${jetty.version}</version>
</dependency>
<dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlets</artifactId>
- <version>${jetty.version}</version>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlets</artifactId>
+ <version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
@@ -831,7 +834,7 @@
<version>${thrift.version}</version>
<scope>compile</scope>
</dependency>
- <!-- used by examples/storm-starter -->
+ <!-- used by examples/storm-starter -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -839,14 +842,38 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.calcite</groupId>
- <artifactId>calcite-core</artifactId>
- <version>${calcite.version}</version>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <version>${calcite.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>${kafka.artifact.id}</artifactId>
+ <version>${kafka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>uk.org.lidalia</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 7f0da6f..648640e 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -317,6 +317,20 @@
<include>README.*</include>
</includes>
</fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../../external/storm-kafka-client/target</directory>
+ <outputDirectory>external/storm-kafka-client</outputDirectory>
+ <includes>
+ <include>storm*jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../../external/storm-kafka-client</directory>
+ <outputDirectory>external/storm-kafka-client</outputDirectory>
+ <includes>
+ <include>README.*</include>
+ </includes>
+ </fileSet>
<!-- $STORM_HOME/extlib -->
<fileSet>
[2/3] storm git commit: STORM-822: Kafka Spout New Consumer API -
Refactored code to avoid keeping records data inside spout state - Refactored
code to specify output fields per stream and build tuples per topic -
Implement exponential backoff retry s
Posted by bo...@apache.org.
STORM-822: Kafka Spout New Consumer API
- Refactored code to avoid keeping records data inside spout state
- Refactored code to specify output fields per stream and build tuples per topic
- Implement exponential backoff retry strategy
- Send one tuple per call to nextTuple
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/332afc40
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/332afc40
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/332afc40
Branch: refs/heads/master
Commit: 332afc40d8ee7c1f4a4a747280ff83a92c279c5d
Parents: d26b984
Author: Hugo Louro <hm...@gmail.com>
Authored: Mon Mar 21 14:42:50 2016 -0700
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 31 13:59:32 2016 -0500
----------------------------------------------------------------------
.../kafka/spout/KafkaRecordTupleBuilder.java | 44 ---
.../apache/storm/kafka/spout/KafkaSpout.java | 160 +++++++----
.../storm/kafka/spout/KafkaSpoutConfig.java | 87 +++---
.../storm/kafka/spout/KafkaSpoutMessageId.java | 16 +-
.../KafkaSpoutRetryExponentialBackoff.java | 281 +++++++++++++++++++
.../kafka/spout/KafkaSpoutRetryService.java | 72 +++++
.../storm/kafka/spout/KafkaSpoutStream.java | 14 +-
.../storm/kafka/spout/KafkaSpoutStreams.java | 26 +-
.../kafka/spout/KafkaSpoutTupleBuilder.java | 34 ++-
.../kafka/spout/KafkaSpoutTuplesBuilder.java | 82 ++++++
.../kafka/spout/test/KafkaSpoutTestBolt.java | 50 ++++
.../spout/test/KafkaSpoutTopologyMain.java | 37 ++-
.../storm/kafka/spout/test/KafkaTestBolt.java | 52 ----
.../spout/test/TopicTest2TupleBuilder.java | 40 +++
.../test/TopicsTest0Test1TupleBuilder.java | 42 +++
15 files changed, 798 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
deleted file mode 100644
index 4d67632..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import java.util.List;
-
-public class KafkaRecordTupleBuilder<K, V> implements KafkaSpoutTupleBuilder<K, V> {
- @Override
- public List<Object> buildTuple(final ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams) {
- final Fields outputFields = kafkaSpoutStreams.getOutputFields(consumerRecord.topic());
- if (outputFields != null) {
- if (outputFields.size() == 3) {
- return new Values(consumerRecord.topic(),
- consumerRecord.partition(),
- consumerRecord.offset());
- } else if (outputFields.size() == 5) {
- return new Values(consumerRecord.topic(),
- consumerRecord.partition(),
- consumerRecord.offset(),
- consumerRecord.key(),
- consumerRecord.value());
- }
- }
- throw new RuntimeException("Failed to build tuple. " + consumerRecord + " " + kafkaSpoutStreams);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 9a49ee8..d211ae9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -34,12 +33,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
@@ -62,23 +65,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
// Bookkeeping
- private KafkaSpoutStreams kafkaSpoutStreams;
- private KafkaSpoutTupleBuilder<K, V> tupleBuilder;
- private transient Timer commitTimer; // timer == null for auto commit mode
- private transient Timer logTimer;
- private transient Map<TopicPartition, OffsetEntry> acked; // emitted tuples that were successfully acked. These tuples will be committed periodically when the timer expires, on consumer rebalance, or on close/deactivate
- private transient int maxRetries; // Max number of times a tuple is retried
- private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process.
+ private transient int maxRetries; // Max number of times a tuple is retried
+ private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation
+ private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure
+ private transient Timer commitTimer; // timer == null for auto commit mode
+ private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process.
// Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
- private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed
- private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
- private transient PollStrategy pollStrategy;
+ private KafkaSpoutStreams kafkaSpoutStreams; // Object that wraps all the logic to declare output fields and emit tuples
+ private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; // Object that contains the logic to build tuples for each ConsumerRecord
- public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaSpoutTupleBuilder<K, V> tupleBuilder) {
+ private transient Map<TopicPartition, OffsetEntry> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
+ private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed
+ private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
+ private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed
+
+
+ public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration
this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
- this.tupleBuilder = tupleBuilder;
}
@Override
@@ -89,18 +94,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
this.collector = collector;
maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
numUncommittedOffsets = 0;
- logTimer = new Timer(500, Math.min(1000, kafkaSpoutConfig.getOffsetsCommitPeriodMs()/2), TimeUnit.MILLISECONDS);
// Offset management
firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
- pollStrategy = kafkaSpoutConfig.getPollStrategy();
consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
+ // Retries management
+ retryService = kafkaSpoutConfig.getRetryService();
+
+ // Tuples builder delegate
+ tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder();
+
if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually
commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
- acked = new HashMap<>();
}
+ acked = new HashMap<>();
+ emitted = new HashSet<>();
+ waitingToEmit = Collections.emptyListIterator();
+
LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
}
@@ -130,6 +142,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
acked.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout
}
+ retryService.retainAll(partitions);
+
for (TopicPartition tp : partitions) {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
final long fetchOffset = doSeek(tp, committedOffset);
@@ -182,67 +196,88 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
if (initialized) {
if (commit()) {
commitOffsetsForAckedTuples();
- } else if (poll()) {
- emitTuples(pollKafkaBroker());
- } else if (logTimer.isExpiredResetOnTrue()) { // to limit the number of messages that get printed.
- log();
+ }
+
+ if (poll()) {
+ setWaitingToEmit(pollKafkaBroker());
+ }
+
+ if (waitingToEmit()) {
+ emit();
}
} else {
LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
}
}
- private void log() {
- switch(pollStrategy) {
- case STREAM:
- LOG.trace("Reached the maximum number number of uncommitted records [{}]. " +
- "No more polls will occur until a sequence of commits sets the count under the [{}] threshold ",
- numUncommittedOffsets, kafkaSpoutConfig.getMaxUncommittedOffsets());
- break;
- case BATCH:
- LOG.trace("No more polls will occur until the last batch completes. [{}] emitted tuples pending", numUncommittedOffsets);
- break;
- default:
- throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy);
- }
+ private boolean commit() {
+ return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode
+ }
+ private boolean poll() {
+ return !waitingToEmit() && numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets();
}
- // always poll in auto commit mode because no state is kept and therefore there is no need to set an upper limit in memory
- private boolean poll() {
- switch(pollStrategy) {
- case STREAM:
- return consumerAutoCommitMode || numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets();
- case BATCH:
- return consumerAutoCommitMode || numUncommittedOffsets <= 0;
- default:
- throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy);
- }
+ private boolean waitingToEmit() {
+ return waitingToEmit != null && waitingToEmit.hasNext();
}
- private boolean commit() {
- return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode
+ public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
+ List<ConsumerRecord<K,V>> waitingToEmitList = new LinkedList<>();
+ for (TopicPartition tp : consumerRecords.partitions()) {
+ waitingToEmitList.addAll(consumerRecords.records(tp));
+ }
+ waitingToEmit = waitingToEmitList.iterator();
+ LOG.trace("Records waiting to be emitted {}", waitingToEmitList);
}
+ // ======== poll =========
private ConsumerRecords<K, V> pollKafkaBroker() {
+ doSeekRetriableTopicPartitions();
+
final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
final int numPolledRecords = consumerRecords.count();
- numUncommittedOffsets+= numPolledRecords;
LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets);
return consumerRecords;
}
- private void emitTuples(ConsumerRecords<K, V> consumerRecords) {
- for (TopicPartition tp : consumerRecords.partitions()) {
- final Iterable<ConsumerRecord<K, V>> records = consumerRecords.records(tp.topic());
+ private void doSeekRetriableTopicPartitions() {
+ final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
+
+ for (TopicPartition rtp : retriableTopicPartitions) {
+ final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
+ if (offsetAndMeta != null) {
+ kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
+ } else {
+ kafkaConsumer.seekToEnd(rtp); // Seek to last committed offset
+ }
+ }
+ }
- for (final ConsumerRecord<K, V> record : records) {
- final List<Object> tuple = tupleBuilder.buildTuple(record, kafkaSpoutStreams);
- final KafkaSpoutMessageId messageId = new KafkaSpoutMessageId(record, tuple);
+ // ======== emit =========
+ private void emit() {
+ emitTupleIfNotEmitted(waitingToEmit.next());
+ waitingToEmit.remove();
+ }
- kafkaSpoutStreams.emit(collector, messageId); // emits one tuple per record
- LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+ // emits one tuple per record
+ private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
+ final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+ final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+
+ if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) { // has been acked
+ LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
+ } else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
+ LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
+ } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
+ final List<Object> tuple = tuplesBuilder.buildTuple(record);
+ kafkaSpoutStreams.emit(collector, tuple, msgId);
+ emitted.add(msgId);
+ numUncommittedOffsets++;
+ if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
+ retryService.remove(msgId); // re-emitted hence remove from failed
}
+ LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
}
}
@@ -275,11 +310,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void ack(Object messageId) {
+ final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically
- final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
acked.get(msgId.getTopicPartition()).add(msgId);
LOG.trace("Acked message [{}]. Messages acked and pending commit [{}]", msgId, acked);
}
+ emitted.remove(msgId);
}
// ======== Fail =======
@@ -287,10 +323,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void fail(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+ emitted.remove(msgId);
if (msgId.numFails() < maxRetries) {
msgId.incrementNumFails();
- kafkaSpoutStreams.emit(collector, msgId);
- LOG.trace("Retried tuple with message id [{}]", msgId);
+ retryService.schedule(msgId);
} else { // limit to max number of retries
LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
ack(msgId);
@@ -367,7 +403,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
this.tp = tp;
this.initialFetchOffset = initialFetchOffset;
this.committedOffset = initialFetchOffset - 1;
- LOG.debug("Created OffsetEntry. {}", this);
+ LOG.debug("Instantiated {}", this);
}
public void add(KafkaSpoutMessageId msgId) { // O(Log N)
@@ -434,6 +470,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
return ackedMsgs.isEmpty();
}
+ public boolean contains(ConsumerRecord record) {
+ return contains(new KafkaSpoutMessageId(record));
+ }
+
+ public boolean contains(KafkaSpoutMessageId msgId) {
+ return ackedMsgs.contains(msgId);
+ }
+
@Override
public String toString() {
return "OffsetEntry{" +
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index d969f1f..29cedb2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -18,7 +18,9 @@
package org.apache.storm.kafka.spout;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
import java.io.Serializable;
import java.util.ArrayList;
@@ -63,24 +65,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
UNCOMMITTED_EARLIEST,
UNCOMMITTED_LATEST }
- /**
- * Defines when to poll the next batch of records from Kafka. The choice of this parameter will affect throughput and the memory
- * footprint of the Kafka spout. The allowed values are STREAM and BATCH. STREAM will likely have higher throughput and use more memory
- * (it stores in memory the entire KafkaRecord,including data). BATCH will likely have less throughput but also use less memory.
- * The BATCH behavior is similar to the behavior of the previous Kafka Spout. De default value is STREAM.
- * <ul>
- * <li>STREAM Every periodic call to nextTuple polls a new batch of records from Kafka as long as the maxUncommittedOffsets
- * threshold has not yet been reached. When the threshold his reached, no more records are polled until enough offsets have been
- * committed, such that the number of pending offsets is less than maxUncommittedOffsets. See {@link Builder#setMaxUncommittedOffsets(int)}
- * </li>
- * <li>BATCH Only polls a new batch of records from kafka once all the records that came in the previous poll have been acked.</li>
- * </ul>
- */
- public enum PollStrategy {
- STREAM,
- BATCH
- }
-
// Kafka consumer configuration
private final Map<String, Object> kafkaProps;
private final Deserializer<K> keyDeserializer;
@@ -92,8 +76,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
private final int maxRetries;
private final int maxUncommittedOffsets;
private final FirstPollOffsetStrategy firstPollOffsetStrategy;
- private final PollStrategy pollStrategy;
private final KafkaSpoutStreams kafkaSpoutStreams;
+ private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
+ private final KafkaSpoutRetryService retryService;
private KafkaSpoutConfig(Builder<K,V> builder) {
this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
@@ -103,9 +88,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
this.maxRetries = builder.maxRetries;
this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
- this.pollStrategy = builder.pollStrategy;
this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+ this.tuplesBuilder = builder.tuplesBuilder;
+ this.retryService = builder.retryService;
}
private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
@@ -117,33 +103,61 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
}
public static class Builder<K,V> {
- private Map<String, Object> kafkaProps;
+ private final Map<String, Object> kafkaProps;
private Deserializer<K> keyDeserializer;
private Deserializer<V> valueDeserializer;
private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
private int maxRetries = DEFAULT_MAX_RETRIES;
private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
- private KafkaSpoutStreams kafkaSpoutStreams;
+ private final KafkaSpoutStreams kafkaSpoutStreams;
private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
- private PollStrategy pollStrategy = PollStrategy.STREAM;
+ private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
+ private final KafkaSpoutRetryService retryService;
+
+ /**
+ * Please refer to javadoc in {@link #Builder(Map, KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/>
+ * This constructor uses by the default the following implementation for {@link KafkaSpoutRetryService}:<p/>
+ * {@code new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+ * DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
+ */
+ public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
+ KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) {
+ this(kafkaProps, kafkaSpoutStreams, tuplesBuilder,
+ new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+ DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)));
+ }
/***
* KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
* The optional configuration can be specified using the set methods of this builder
* @param kafkaProps properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a>
* @param kafkaSpoutStreams streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream.
+ * @param tuplesBuilder logic to build tuples from {@link ConsumerRecord}s.
+ * @param retryService logic that manages the retrial of failed tuples
*/
- public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams) {
+ public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
+ KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, KafkaSpoutRetryService retryService) {
if (kafkaProps == null || kafkaProps.isEmpty()) {
- throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required. " + kafkaProps);
+ throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required: " + kafkaProps);
}
if (kafkaSpoutStreams == null) {
- throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit in the same stream.");
+ throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit to the same stream");
+ }
+
+ if (tuplesBuilder == null) {
+ throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams");
}
+
+ if (retryService == null) {
+ throw new IllegalArgumentException("Must specify at implementation of retry service");
+ }
+
this.kafkaProps = kafkaProps;
this.kafkaSpoutStreams = kafkaSpoutStreams;
+ this.tuplesBuilder = tuplesBuilder;
+ this.retryService = retryService;
}
/**
@@ -214,16 +228,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return this;
}
- /**
- * Sets the strategy used by the the Kafka spout to decide when to poll the next batch of records from Kafka.
- * Please refer to to the documentation in {@link PollStrategy}
- * @param pollStrategy strategy used to decide when to poll
- * */
- public Builder<K, V> setPollStrategy(PollStrategy pollStrategy) {
- this.pollStrategy = pollStrategy;
- return this;
- }
-
public KafkaSpoutConfig<K,V> build() {
return new KafkaSpoutConfig<>(this);
}
@@ -258,6 +262,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return (String) kafkaProps.get(Consumer.GROUP_ID);
}
+ /**
+ * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream}
+ */
public List<String> getSubscribedTopics() {
return new ArrayList<>(kafkaSpoutStreams.getTopics());
}
@@ -278,8 +285,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return maxUncommittedOffsets;
}
- public PollStrategy getPollStrategy() {
- return pollStrategy;
+ public KafkaSpoutTuplesBuilder<K, V> getTuplesBuilder() {
+ return tuplesBuilder;
+ }
+
+ public KafkaSpoutRetryService getRetryService() {
+ return retryService;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index 0a6b126..71f8327 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -21,23 +21,18 @@ package org.apache.storm.kafka.spout;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
-import java.util.Collections;
-import java.util.List;
-
public class KafkaSpoutMessageId {
private transient TopicPartition topicPart;
private transient long offset;
- private transient List<Object> tuple;
private transient int numFails = 0;
- public KafkaSpoutMessageId(ConsumerRecord consumerRecord, List<Object> tuple) {
- this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), tuple);
+ public KafkaSpoutMessageId(ConsumerRecord consumerRecord) {
+ this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
}
- public KafkaSpoutMessageId(TopicPartition topicPart, long offset, List<Object> tuple) {
+ public KafkaSpoutMessageId(TopicPartition topicPart, long offset) {
this.topicPart = topicPart;
this.offset = offset;
- this.tuple = tuple;
}
public int partition() {
@@ -64,10 +59,6 @@ public class KafkaSpoutMessageId {
return topicPart;
}
- public List<Object> getTuple() {
- return Collections.unmodifiableList(tuple);
- }
-
public String getMetadata(Thread currThread) {
return "{" +
"topic-partition=" + topicPart +
@@ -83,7 +74,6 @@ public class KafkaSpoutMessageId {
"topic-partition=" + topicPart +
", offset=" + offset +
", numFails=" + numFails +
- ", tuple=" + tuple +
'}';
}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
new file mode 100644
index 0000000..208cef4
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
+ * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
+ * nextRetry = Min(nextRetry, currentTime + maxDelay)
+ */
+public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
+ private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
+
+ private TimeInterval initialDelay;
+ private TimeInterval delayPeriod;
+ private TimeInterval maxDelay;
+ private int maxRetries;
+
+ private Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
+ private Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups
+
+ /**
+ * Comparator ordering by timestamp
+ */
+ private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
+ public int compare(RetrySchedule entry1, RetrySchedule entry2) {
+ return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
+ }
+ }
+
+ private class RetrySchedule {
+ private KafkaSpoutMessageId msgId;
+ private long nextRetryTimeNanos;
+
+ public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
+ this.msgId = msgId;
+ this.nextRetryTimeNanos = nextRetryTime;
+ LOG.debug("Created {}", this);
+ }
+
+ public void setNextRetryTime() {
+ nextRetryTimeNanos = nextTime(msgId);
+ LOG.debug("Updated {}", this);
+ }
+
+ public boolean retry(long currentTimeNanos) {
+ return nextRetryTimeNanos <= currentTimeNanos;
+ }
+
+ @Override
+ public String toString() {
+ return "RetrySchedule{" +
+ "msgId=" + msgId +
+ ", nextRetryTime=" + nextRetryTimeNanos +
+ '}';
+ }
+
+ public KafkaSpoutMessageId msgId() {
+ return msgId;
+ }
+
+ public long nextRetryTimeNanos() {
+ return nextRetryTimeNanos;
+ }
+ }
+
+ public static class TimeInterval implements Serializable {
+ private long lengthNanos;
+ private long length;
+ private TimeUnit timeUnit;
+
+ /**
+ * @param length length of the time interval in the units specified by {@link TimeUnit}
+ * @param timeUnit unit used to specify a time interval on which to specify a time unit
+ */
+ public TimeInterval(long length, TimeUnit timeUnit) {
+ this.length = length;
+ this.timeUnit = timeUnit;
+ this.lengthNanos = timeUnit.toNanos(length);
+ }
+
+ public static TimeInterval seconds(long length) {
+ return new TimeInterval(length, TimeUnit.SECONDS);
+ }
+
+ public static TimeInterval milliSeconds(long length) {
+ return new TimeInterval(length, TimeUnit.MILLISECONDS);
+ }
+
+ public static TimeInterval microSeconds(long length) {
+ return new TimeInterval(length, TimeUnit.MILLISECONDS);
+ }
+
+ public long lengthNanos() {
+ return lengthNanos;
+ }
+
+ public long length() {
+ return length;
+ }
+
+ public TimeUnit timeUnit() {
+ return timeUnit;
+ }
+
+ @Override
+ public String toString() {
+ return "TimeInterval{" +
+ "length=" + length +
+ ", timeUnit=" + timeUnit +
+ '}';
+ }
+ }
+
+ /**
+ * The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression):
+ * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
+ * nextRetry = Min(nextRetry, currentTime + maxDelay)
+ *
+ * @param initialDelay initial delay of the first retry
+ * @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression)
+ * @param maxRetries maximum number of times a tuple is retried before being acked and scheduled for commit
+ * @param maxDelay maximum amount of time waiting before retrying
+ *
+ */
+ public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
+ this.initialDelay = initialDelay;
+ this.delayPeriod = delayPeriod;
+ this.maxRetries = maxRetries;
+ this.maxDelay = maxDelay;
+ LOG.debug("Instantiated {}", this);
+ }
+
+ @Override
+ public Set<TopicPartition> retriableTopicPartitions() {
+ final Set<TopicPartition> tps = new TreeSet<>();
+ final long currentTimeNanos = System.nanoTime();
+ for (RetrySchedule retrySchedule : retrySchedules) {
+ if (retrySchedule.retry(currentTimeNanos)) {
+ final KafkaSpoutMessageId msgId = retrySchedule.msgId;
+ tps.add(new TopicPartition(msgId.topic(), msgId.partition()));
+ } else {
+ break; // Stop searching as soon as passed current time
+ }
+ }
+ LOG.debug("Topic partitions with entries ready to be retried [{}] ", tps);
+ return tps;
+ }
+
+ @Override
+ public boolean isReady(KafkaSpoutMessageId msgId) {
+ boolean retry = false;
+ if (toRetryMsgs.contains(msgId)) {
+ final long currentTimeNanos = System.nanoTime();
+ for (RetrySchedule retrySchedule : retrySchedules) {
+ if (retrySchedule.retry(currentTimeNanos)) {
+ if (retrySchedule.msgId.equals(msgId)) {
+ retry = true;
+ LOG.debug("Found entry to retry {}", retrySchedule);
+ }
+ } else {
+ LOG.debug("Entry to retry not found {}", retrySchedule);
+ break; // Stop searching as soon as passed current time
+ }
+ }
+ }
+ return retry;
+ }
+
+ @Override
+ public boolean isScheduled(KafkaSpoutMessageId msgId) {
+ return toRetryMsgs.contains(msgId);
+ }
+
+ @Override
+ public boolean remove(KafkaSpoutMessageId msgId) {
+ boolean removed = false;
+ if (toRetryMsgs.contains(msgId)) {
+ for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
+ final RetrySchedule retrySchedule = iterator.next();
+ if (retrySchedule.msgId().equals(msgId)) {
+ iterator.remove();
+ toRetryMsgs.remove(msgId);
+ removed = true;
+ break;
+ }
+ }
+ }
+ LOG.debug(removed ? "Removed {} " : "Not removed {}", msgId);
+ LOG.trace("Current state {}", retrySchedules);
+ return removed;
+ }
+
+ @Override
+ public boolean retainAll(Collection<TopicPartition> topicPartitions) {
+ boolean result = false;
+ for (Iterator<RetrySchedule> rsIterator = retrySchedules.iterator(); rsIterator.hasNext(); ) {
+ final RetrySchedule retrySchedule = rsIterator.next();
+ final KafkaSpoutMessageId msgId = retrySchedule.msgId;
+ final TopicPartition tpRetry= new TopicPartition(msgId.topic(), msgId.partition());
+ if (!topicPartitions.contains(tpRetry)) {
+ rsIterator.remove();
+ toRetryMsgs.remove(msgId);
+ LOG.debug("Removed {}", retrySchedule);
+ LOG.trace("Current state {}", retrySchedules);
+ result = true;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void schedule(KafkaSpoutMessageId msgId) {
+ if (msgId.numFails() > maxRetries) {
+ LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
+ } else {
+ if (toRetryMsgs.contains(msgId)) {
+ for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
+ final RetrySchedule retrySchedule = iterator.next();
+ if (retrySchedule.msgId().equals(msgId)) {
+ iterator.remove();
+ toRetryMsgs.remove(msgId);
+ }
+ }
+ }
+ final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId));
+ retrySchedules.add(retrySchedule);
+ toRetryMsgs.add(msgId);
+ LOG.debug("Scheduled. {}", retrySchedule);
+ LOG.trace("Current state {}", retrySchedules);
+ }
+ }
+
+ // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
+ private long nextTime(KafkaSpoutMessageId msgId) {
+ final long currentTimeNanos = System.nanoTime();
+ final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ...
+ ? currentTimeNanos + initialDelay.lengthNanos()
+ : (long) (currentTimeNanos + Math.pow(delayPeriod.lengthNanos, msgId.numFails() - 1));
+ return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaSpoutRetryExponentialBackoff{" +
+ "delay=" + initialDelay +
+ ", ratio=" + delayPeriod +
+ ", maxRetries=" + maxRetries +
+ ", maxRetryDelay=" + maxDelay +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
new file mode 100644
index 0000000..5aab167
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * Represents the logic that manages the retrial of failed tuples.
+ */
+public interface KafkaSpoutRetryService extends Serializable {
+ /**
+ * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled.
+ * @param msgId message to schedule for retrial
+ */
+ void schedule(KafkaSpoutMessageId msgId);
+
+ /**
+ * Removes a message from the list of messages scheduled for retrial
+ * @param msgId message to remove from retrial
+ */
+ boolean remove(KafkaSpoutMessageId msgId);
+
+ /**
+ * Retains all the messages whose {@link TopicPartition} belongs to the specified {@code Collection<TopicPartition>}.
+ * All messages that come from a {@link TopicPartition} NOT existing in the collection will be removed.
+ * This method is useful to cleanup state following partition rebalance.
+ * @param topicPartitions Collection of {@link TopicPartition} for which to keep messages
+ * @return true if at least one message was removed, false otherwise
+ */
+ boolean retainAll(Collection<TopicPartition> topicPartitions);
+
+ /**
+ * @return set of topic partitions that have offsets that are ready to be retried, i.e.,
+ * for which a tuple has failed and has retry time less than current time
+ */
+ Set<TopicPartition> retriableTopicPartitions();
+
+ /**
+ * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried,
+ * i.e is scheduled and has retry time that is less than current time.
+ * @return true if message is ready to be retried, false otherwise
+ */
+ boolean isReady(KafkaSpoutMessageId msgId);
+
+ /**
+ * Checks if a specific failed {@link KafkaSpoutMessageId} is scheduled to be retried.
+ * The message may or may not be ready to be retried yet.
+ * @return true if the message is scheduled to be retried, regardless of being or not ready to be retried.
+ * Returns false is this message is not scheduled for retrial
+ */
+ boolean isScheduled(KafkaSpoutMessageId msgId);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
index 43464a9..064a8bb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
@@ -31,27 +31,31 @@ public class KafkaSpoutStream implements Serializable {
private final String streamId;
private final String topic;
- /** Declare specified outputFields with default stream for the specified topic */
+ /** Represents the specified outputFields and topic with the default stream */
KafkaSpoutStream(Fields outputFields, String topic) {
this(outputFields, Utils.DEFAULT_STREAM_ID, topic);
}
- /** Declare specified outputFields with specified stream for the specified topic */
+ /** Represents the specified outputFields and topic with the specified stream */
KafkaSpoutStream(Fields outputFields, String streamId, String topic) {
+ if (outputFields == null || streamId == null || topic == null) {
+ throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " +
+ "[outputFields=%s, streamId=%s, topic=%s]", outputFields, streamId, topic));
+ }
this.outputFields = outputFields;
this.streamId = streamId;
this.topic = topic;
}
- public Fields getOutputFields() {
+ Fields getOutputFields() {
return outputFields;
}
- public String getStreamId() {
+ String getStreamId() {
return streamId;
}
- public String getTopic() {
+ String getTopic() {
return topic;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
index 30215d1..dc5892e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
@@ -33,7 +33,7 @@ import java.util.List;
import java.util.Map;
/**
- * Represents the output streams associated with each topic, and provides a public API to
+ * Represents the {@link KafkaSpoutStream} associated with each topic, and provides a public API to
* declare output streams and emmit tuples, on the appropriate stream, for all the topics specified.
*/
public class KafkaSpoutStreams implements Serializable {
@@ -48,7 +48,7 @@ public class KafkaSpoutStreams implements Serializable {
/**
* @param topic the topic for which to get output fields
- * @return the output fields declared
+ * @return the declared output fields
*/
public Fields getOutputFields(String topic) {
if (topicToStream.containsKey(topic)) {
@@ -79,7 +79,7 @@ public class KafkaSpoutStreams implements Serializable {
return new ArrayList<>(topicToStream.keySet());
}
- void declareOutputFields(OutputFieldsDeclarer declarer) {
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
for (KafkaSpoutStream stream : topicToStream.values()) {
if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) {
declarer.declareStream(stream.getStreamId(), stream.getOutputFields());
@@ -88,8 +88,8 @@ public class KafkaSpoutStreams implements Serializable {
}
}
- void emit(SpoutOutputCollector collector, KafkaSpoutMessageId messageId) {
- collector.emit(getStreamId(messageId.topic()), messageId.getTuple(), messageId);
+ public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) {
+ collector.emit(getStreamId(messageId.topic()), tuple, messageId);
}
@Override
@@ -103,11 +103,11 @@ public class KafkaSpoutStreams implements Serializable {
private final Map<String, KafkaSpoutStream> topicToStream = new HashMap<>();;
/**
- * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified.
- * All the topics will have the same stream id and output fields.
+ * Creates a {@link KafkaSpoutStream} with the given output Fields for each topic specified.
+ * All topics will have the same stream id and output fields.
*/
public Builder(Fields outputFields, String... topics) {
- this(outputFields, Utils.DEFAULT_STREAM_ID, topics);
+ addStream(outputFields, topics);
}
/**
@@ -115,16 +115,14 @@ public class KafkaSpoutStreams implements Serializable {
* All the topics will have the same stream id and output fields.
*/
public Builder (Fields outputFields, String streamId, String... topics) {
- for (String topic : topics) {
- topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic));
- }
+ addStream(outputFields, streamId, topics);
}
/**
* Adds this stream to the state representing the streams associated with each topic
*/
public Builder(KafkaSpoutStream stream) {
- topicToStream.put(stream.getTopic(), stream);
+ addStream(stream);
}
/**
@@ -139,9 +137,7 @@ public class KafkaSpoutStreams implements Serializable {
* Please refer to javadoc in {@link #Builder(Fields, String...)}
*/
public Builder addStream(Fields outputFields, String... topics) {
- for (String topic : topics) {
- topicToStream.put(topic, new KafkaSpoutStream(outputFields, topic));
- }
+ addStream(outputFields, Utils.DEFAULT_STREAM_ID, topics);
return this;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
index 45aab48..3bb71a8 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
@@ -21,8 +21,38 @@ package org.apache.storm.kafka.spout;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
-public interface KafkaSpoutTupleBuilder<K,V> extends Serializable {
- List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams);
+/**
+ * Implementations of {@link KafkaSpoutTupleBuilder} contain the logic to build tuples from {@link ConsumerRecord}s.
+ * Users must subclass this abstract class to provide their implementation. See also {@link KafkaSpoutTuplesBuilder}
+ */
+public abstract class KafkaSpoutTupleBuilder<K,V> implements Serializable {
+ private List<String> topics;
+
+ /**
+ * @param topics list of topics that use this implementation to build tuples
+ */
+ public KafkaSpoutTupleBuilder(String... topics) {
+ if (topics == null || topics.length == 0) {
+ throw new IllegalArgumentException("Must specify at least one topic. It cannot be null or empty");
+ }
+ this.topics = Arrays.asList(topics);
+ }
+
+ /**
+ * @return list of topics that use this implementation to build tuples
+ */
+ public List<String> getTopics() {
+ return Collections.unmodifiableList(topics);
+ }
+
+ /**
+ * Builds a list of tuples using the ConsumerRecord specified as parameter
+ * @param consumerRecord whose contents are used to build tuples
+ * @return list of tuples
+ */
+ public abstract List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
new file mode 100644
index 0000000..d67c69d
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link KafkaSpoutTuplesBuilder} wraps all the logic that builds tuples from {@link ConsumerRecord}s.
+ * The logic is provided by the user by implementing the appropriate number of {@link KafkaSpoutTupleBuilder} instances
+ */
+public class KafkaSpoutTuplesBuilder<K,V> implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTuplesBuilder.class);
+
+ private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
+
+ private KafkaSpoutTuplesBuilder(Builder<K,V> builder) {
+ this.topicToTupleBuilders = builder.topicToTupleBuilders;
+ LOG.debug("Instantiated {}", this);
+ }
+
+ public static class Builder<K,V> {
+ private List<KafkaSpoutTupleBuilder<K, V>> tupleBuilders;
+ private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
+
+ @SafeVarargs
+ public Builder(KafkaSpoutTupleBuilder<K,V>... tupleBuilders) {
+ if (tupleBuilders == null || tupleBuilders.length == 0) {
+ throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams");
+ }
+
+ this.tupleBuilders = Arrays.asList(tupleBuilders);
+ topicToTupleBuilders = new HashMap<>();
+ }
+
+ public KafkaSpoutTuplesBuilder<K,V> build() {
+ for (KafkaSpoutTupleBuilder<K, V> tupleBuilder : tupleBuilders) {
+ for (String topic : tupleBuilder.getTopics()) {
+ if (!topicToTupleBuilders.containsKey(topic)) {
+ topicToTupleBuilders.put(topic, tupleBuilder);
+ }
+ }
+ }
+ return new KafkaSpoutTuplesBuilder<>(this);
+ }
+ }
+
+ public List<Object>buildTuple(ConsumerRecord<K,V> consumerRecord) {
+ final String topic = consumerRecord.topic();
+ return topicToTupleBuilders.get(topic).buildTuple(consumerRecord);
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaSpoutTuplesBuilder{" +
+ "topicToTupleBuilders=" + topicToTupleBuilders +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
new file mode 100644
index 0000000..7a94a50
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class KafkaSpoutTestBolt extends BaseRichBolt {
+ protected static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTestBolt.class);
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ LOG.debug("input = [" + input + "]");
+ collector.ack(input);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
index 4fcc3ef..0691dd3 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
@@ -22,11 +22,13 @@ import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.spout.KafkaRecordTupleBuilder;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
@@ -35,9 +37,9 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy.STREAM;
public class KafkaSpoutTopologyMain {
private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"};
@@ -80,21 +82,29 @@ public class KafkaSpoutTopologyMain {
public static StormTopology getTopolgyKafkaSpout() {
final TopologyBuilder tp = new TopologyBuilder();
- tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams()), getTupleBuilder()), 1);
- tp.setBolt("kafka_bolt", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
- tp.setBolt("kafka_bolt_1", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
+ tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
+ tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
+ tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
return tp.createTopology();
}
public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
- return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams)
+ return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
.setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(EARLIEST)
- .setPollStrategy(STREAM)
.setMaxUncommittedOffsets(250)
.build();
}
+ private static KafkaSpoutRetryService getRetryService() {
+ return new KafkaSpoutRetryExponentialBackoff(getTimeInterval(500, TimeUnit.MICROSECONDS),
+ TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
+ }
+
+ private static TimeInterval getTimeInterval(long delay, TimeUnit timeUnit) {
+ return new TimeInterval(delay, timeUnit);
+ }
+
public static Map<String,Object> getKafkaConsumerProps() {
Map<String, Object> props = new HashMap<>();
// props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
@@ -105,16 +115,19 @@ public class KafkaSpoutTopologyMain {
return props;
}
- public static KafkaSpoutTupleBuilder<String,String> getTupleBuilder() {
- return new KafkaRecordTupleBuilder<>();
+ public static KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
+ return new KafkaSpoutTuplesBuilder.Builder<>(
+ new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]),
+ new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
+ .build();
}
public static KafkaSpoutStreams getKafkaSpoutStreams() {
final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
final Fields outputFields1 = new Fields("topic", "partition", "offset");
return new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent to test_stream
- .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents topic test2 sent to test_stream
- .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents topic test2 sent to test_stream2
+ .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents of topic test2 sent to test_stream
+ .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents of topic test2 sent to test2_stream
.build();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
deleted file mode 100644
index c9ff9d5..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.test;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class KafkaTestBolt extends BaseRichBolt {
- protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBolt.class);
-
-
- private OutputCollector collector;
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- LOG.debug("input = [" + input + "]");
- collector.ack(input);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
new file mode 100644
index 0000000..ca65177
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class TopicTest2TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
+ /**
+ * @param topics list of topics that use this implementation to build tuples
+ */
+ public TopicTest2TupleBuilder(String... topics) {
+ super(topics);
+ }
+
+ @Override
+ public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+ return new Values(consumerRecord.topic(),
+ consumerRecord.partition(),
+ consumerRecord.offset());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
new file mode 100644
index 0000000..4c55aa1
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class TopicsTest0Test1TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
+ /**
+ * @param topics list of topics that use this implementation to build tuples
+ */
+ public TopicsTest0Test1TupleBuilder(String... topics) {
+ super(topics);
+ }
+
+ @Override
+ public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+ return new Values(consumerRecord.topic(),
+ consumerRecord.partition(),
+ consumerRecord.offset(),
+ consumerRecord.key(),
+ consumerRecord.value());
+ }
+}
\ No newline at end of file