You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/31 21:00:10 UTC
[3/3] storm git commit: STORM-822: Kafka Spout New Consumer API
STORM-822: Kafka Spout New Consumer API
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d26b984d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d26b984d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d26b984d
Branch: refs/heads/master
Commit: d26b984d74987c95badbdf8a73c74c4304bd4ec9
Parents: 955b445
Author: Hugo Louro <hm...@gmail.com>
Authored: Mon Dec 14 10:16:42 2015 -0800
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 31 13:59:32 2016 -0500
----------------------------------------------------------------------
examples/storm-starter/pom.xml | 16 +-
external/flux/flux-examples/pom.xml | 13 +-
external/flux/pom.xml | 13 +-
external/sql/storm-sql-kafka/pom.xml | 16 +-
external/storm-kafka-client/README.md | 9 +
external/storm-kafka-client/pom.xml | 86 ++++
.../kafka/spout/KafkaRecordTupleBuilder.java | 44 ++
.../apache/storm/kafka/spout/KafkaSpout.java | 503 +++++++++++++++++++
.../storm/kafka/spout/KafkaSpoutConfig.java | 298 +++++++++++
.../storm/kafka/spout/KafkaSpoutMessageId.java | 111 ++++
.../storm/kafka/spout/KafkaSpoutStream.java | 66 +++
.../storm/kafka/spout/KafkaSpoutStreams.java | 162 ++++++
.../kafka/spout/KafkaSpoutTupleBuilder.java | 28 ++
.../spout/test/KafkaSpoutTopologyMain.java | 120 +++++
.../storm/kafka/spout/test/KafkaTestBolt.java | 52 ++
external/storm-kafka/pom.xml | 16 +-
external/storm-solr/pom.xml | 8 +-
pom.xml | 57 ++-
storm-dist/binary/src/main/assembly/binary.xml | 14 +
19 files changed, 1544 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 0b5fd97..112f1e8 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -158,26 +158,12 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.2.1</version>
- <!-- use provided scope, so users can pull in whichever scala version they choose -->
+ <artifactId>${kafka.artifact.id}</artifactId>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
- <version>0.8.2.1</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/flux/flux-examples/pom.xml
----------------------------------------------------------------------
diff --git a/external/flux/flux-examples/pom.xml b/external/flux/flux-examples/pom.xml
index 28d7239..48cc151 100644
--- a/external/flux/flux-examples/pom.xml
+++ b/external/flux/flux-examples/pom.xml
@@ -95,18 +95,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.1.1</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
+ <artifactId>${kafka.artifact.id}</artifactId>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/flux/pom.xml
----------------------------------------------------------------------
diff --git a/external/flux/pom.xml b/external/flux/pom.xml
index 1fd1683..56d9bab 100644
--- a/external/flux/pom.xml
+++ b/external/flux/pom.xml
@@ -78,19 +78,8 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.1.1</version>
+ <artifactId>${kafka.artifact.id}</artifactId>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/sql/storm-sql-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/pom.xml b/external/sql/storm-sql-kafka/pom.xml
index 450611e..0642d17 100644
--- a/external/sql/storm-sql-kafka/pom.xml
+++ b/external/sql/storm-sql-kafka/pom.xml
@@ -63,26 +63,12 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.2.1</version>
- <!-- use provided scope, so users can pull in whichever scala version they choose -->
+ <artifactId>${kafka.artifact.id}</artifactId>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
- <version>0.8.2.1</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/README.md b/external/storm-kafka-client/README.md
new file mode 100644
index 0000000..8ac15f5
--- /dev/null
+++ b/external/storm-kafka-client/README.md
@@ -0,0 +1,9 @@
+#Storm Kafka Spout New Consumer API
+
+This patch is still under development and it comes with no warranties at this moment.
+
+It has not been thoroughly tested, and therefore there may be some bugs and it is not ready for production.
+
+The documentation will be uploaded soon.
+
+To see how to use the new Kafka Spout, please refer to the example under tests. Thank you!
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
new file mode 100644
index 0000000..6c82b6a
--- /dev/null
+++ b/external/storm-kafka-client/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-kafka-client</artifactId>
+ <name>storm-kafka-client</name>
+
+ <packaging>jar</packaging>
+
+ <developers>
+ <developer>
+ <id>hmcl</id>
+ <name>Hugo Louro</name>
+ <email>hmclouro@gmail.com</email>
+ </developer>
+ </developers>
+
+ <dependencies>
+ <!--parent module dependency-->
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!--kafka libraries-->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <!--test dependencies -->
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.5</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
new file mode 100644
index 0000000..4d67632
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class KafkaRecordTupleBuilder<K, V> implements KafkaSpoutTupleBuilder<K, V> {
+ @Override
+ public List<Object> buildTuple(final ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams) {
+ final Fields outputFields = kafkaSpoutStreams.getOutputFields(consumerRecord.topic());
+ if (outputFields != null) {
+ if (outputFields.size() == 3) {
+ return new Values(consumerRecord.topic(),
+ consumerRecord.partition(),
+ consumerRecord.offset());
+ } else if (outputFields.size() == 5) {
+ return new Values(consumerRecord.topic(),
+ consumerRecord.partition(),
+ consumerRecord.offset(),
+ consumerRecord.key(),
+ consumerRecord.value());
+ }
+ }
+ throw new RuntimeException("Failed to build tuple. " + consumerRecord + " " + kafkaSpoutStreams);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
new file mode 100644
index 0000000..9a49ee8
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+
+public class KafkaSpout<K, V> extends BaseRichSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
+ private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
+
+ // Storm
+ protected SpoutOutputCollector collector;
+
+ // Kafka
+ private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+ private transient KafkaConsumer<K, V> kafkaConsumer;
+ private transient boolean consumerAutoCommitMode;
+
+
+ // Bookkeeping
+ private KafkaSpoutStreams kafkaSpoutStreams;
+ private KafkaSpoutTupleBuilder<K, V> tupleBuilder;
+ private transient Timer commitTimer; // timer == null for auto commit mode
+ private transient Timer logTimer;
+ private transient Map<TopicPartition, OffsetEntry> acked; // emitted tuples that were successfully acked. These tuples will be committed periodically when the timer expires, on consumer rebalance, or on close/deactivate
+ private transient int maxRetries; // Max number of times a tuple is retried
+ private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process.
+ // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
+ private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed
+ private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
+ private transient PollStrategy pollStrategy;
+
+
+ public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaSpoutTupleBuilder<K, V> tupleBuilder) {
+ this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration
+ this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
+ this.tupleBuilder = tupleBuilder;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ initialized = false;
+
+ // Spout internals
+ this.collector = collector;
+ maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
+ numUncommittedOffsets = 0;
+ logTimer = new Timer(500, Math.min(1000, kafkaSpoutConfig.getOffsetsCommitPeriodMs()/2), TimeUnit.MILLISECONDS);
+
+ // Offset management
+ firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
+ pollStrategy = kafkaSpoutConfig.getPollStrategy();
+ consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
+
+ if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually
+ commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
+ acked = new HashMap<>();
+ }
+
+ LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
+ }
+
+ // =========== Consumer Rebalance Listener - On the same thread as the caller ===========
+
+ private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ LOG.debug("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
+ kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+ if (!consumerAutoCommitMode && initialized) {
+ initialized = false;
+ commitOffsetsForAckedTuples();
+ }
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ LOG.debug("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
+ kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+
+ initialize(partitions);
+ }
+
+ private void initialize(Collection<TopicPartition> partitions) {
+ if (!consumerAutoCommitMode) {
+ acked.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout
+ }
+
+ for (TopicPartition tp : partitions) {
+ final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
+ final long fetchOffset = doSeek(tp, committedOffset);
+ setAcked(tp, fetchOffset);
+ }
+ initialized = true;
+ LOG.debug("Initialization complete");
+ }
+
+ /**
+ * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset
+ */
+ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) {
+ long fetchOffset;
+ if (committedOffset != null) { // offset was committed for this TopicPartition
+ if (firstPollOffsetStrategy.equals(EARLIEST)) {
+ kafkaConsumer.seekToBeginning(tp);
+ fetchOffset = kafkaConsumer.position(tp);
+ } else if (firstPollOffsetStrategy.equals(LATEST)) {
+ kafkaConsumer.seekToEnd(tp);
+ fetchOffset = kafkaConsumer.position(tp);
+ } else {
+ // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset.
+ fetchOffset = committedOffset.offset() + 1;
+ kafkaConsumer.seek(tp, fetchOffset);
+ }
+ } else { // no commits have ever been done, so start at the beginning or end depending on the strategy
+ if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
+ kafkaConsumer.seekToBeginning(tp);
+ } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
+ kafkaConsumer.seekToEnd(tp);
+ }
+ fetchOffset = kafkaConsumer.position(tp);
+ }
+ return fetchOffset;
+ }
+ }
+
+ private void setAcked(TopicPartition tp, long fetchOffset) {
+ // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
+ if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
+ acked.put(tp, new OffsetEntry(tp, fetchOffset));
+ }
+ }
+
+ // ======== Next Tuple =======
+
+ @Override
+ public void nextTuple() {
+ if (initialized) {
+ if (commit()) {
+ commitOffsetsForAckedTuples();
+ } else if (poll()) {
+ emitTuples(pollKafkaBroker());
+ } else if (logTimer.isExpiredResetOnTrue()) { // to limit the number of messages that get printed.
+ log();
+ }
+ } else {
+ LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
+ }
+ }
+
+ private void log() {
+ switch(pollStrategy) {
+ case STREAM:
+ LOG.trace("Reached the maximum number number of uncommitted records [{}]. " +
+ "No more polls will occur until a sequence of commits sets the count under the [{}] threshold ",
+ numUncommittedOffsets, kafkaSpoutConfig.getMaxUncommittedOffsets());
+ break;
+ case BATCH:
+ LOG.trace("No more polls will occur until the last batch completes. [{}] emitted tuples pending", numUncommittedOffsets);
+ break;
+ default:
+ throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy);
+ }
+
+ }
+
+ // always poll in auto commit mode because no state is kept and therefore there is no need to set an upper limit in memory
+ private boolean poll() {
+ switch(pollStrategy) {
+ case STREAM:
+ return consumerAutoCommitMode || numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets();
+ case BATCH:
+ return consumerAutoCommitMode || numUncommittedOffsets <= 0;
+ default:
+ throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy);
+ }
+ }
+
+ private boolean commit() {
+ return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode
+ }
+
+ private ConsumerRecords<K, V> pollKafkaBroker() {
+ final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
+ final int numPolledRecords = consumerRecords.count();
+ numUncommittedOffsets+= numPolledRecords;
+ LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets);
+ return consumerRecords;
+ }
+
+ private void emitTuples(ConsumerRecords<K, V> consumerRecords) {
+ for (TopicPartition tp : consumerRecords.partitions()) {
+ final Iterable<ConsumerRecord<K, V>> records = consumerRecords.records(tp.topic());
+
+ for (final ConsumerRecord<K, V> record : records) {
+ final List<Object> tuple = tupleBuilder.buildTuple(record, kafkaSpoutStreams);
+ final KafkaSpoutMessageId messageId = new KafkaSpoutMessageId(record, tuple);
+
+ kafkaSpoutStreams.emit(collector, messageId); // emits one tuple per record
+ LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+ }
+ }
+ }
+
+ private void commitOffsetsForAckedTuples() {
+ // Find offsets that are ready to be committed for every topic partition
+ final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) {
+ final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset();
+ if (nextCommitOffset != null) {
+ nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
+ }
+ }
+
+ // Commit offsets that are ready to be committed for every topic partition
+ if (!nextCommitOffsets.isEmpty()) {
+ kafkaConsumer.commitSync(nextCommitOffsets);
+ LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
+ // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition
+ // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop
+ for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) {
+ final OffsetEntry offsetEntry = tpOffset.getValue();
+ offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey()));
+ }
+ } else {
+ LOG.trace("No offsets to commit. {}", this);
+ }
+ }
+
+ // ======== Ack =======
+
+ @Override
+ public void ack(Object messageId) {
+ if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically
+ final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+ acked.get(msgId.getTopicPartition()).add(msgId);
+ LOG.trace("Acked message [{}]. Messages acked and pending commit [{}]", msgId, acked);
+ }
+ }
+
+ // ======== Fail =======
+
+ @Override
+ public void fail(Object messageId) {
+ final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+ if (msgId.numFails() < maxRetries) {
+ msgId.incrementNumFails();
+ kafkaSpoutStreams.emit(collector, msgId);
+ LOG.trace("Retried tuple with message id [{}]", msgId);
+ } else { // limit to max number of retries
+ LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
+ ack(msgId);
+ }
+ }
+
+ // ======== Activate / Deactivate / Close / Declare Outputs =======
+
+ @Override
+ public void activate() {
+ subscribeKafkaConsumer();
+ }
+
+ private void subscribeKafkaConsumer() {
+ kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
+ kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
+ kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(), new KafkaSpoutConsumerRebalanceListener());
+ // Initial poll to get the consumer registration process going.
+ // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
+ kafkaConsumer.poll(0);
+ }
+
+ @Override
+ public void deactivate() {
+ shutdown();
+ }
+
+ @Override
+ public void close() {
+ shutdown();
+ }
+
+ private void shutdown() {
+ try {
+ kafkaConsumer.wakeup();
+ if (!consumerAutoCommitMode) {
+ commitOffsetsForAckedTuples();
+ }
+ } finally {
+ //remove resources
+ kafkaConsumer.close();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ kafkaSpoutStreams.declareOutputFields(declarer);
+ }
+
+ @Override
+ public String toString() {
+ return "{acked=" + acked + "} ";
+ }
+
+ // ======= Offsets Commit Management ==========
+
+ private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> {
+ public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
+ return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1;
+ }
+ }
+
+ /**
+ * This class is not thread safe
+ */
+ private class OffsetEntry {
+ private final TopicPartition tp;
+ private final long initialFetchOffset; /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
+ * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
+ private long committedOffset; // last offset committed to Kafka. Initially it is set to fetchOffset - 1
+ private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR); // acked messages sorted by ascending order of offset
+
+ public OffsetEntry(TopicPartition tp, long initialFetchOffset) {
+ this.tp = tp;
+ this.initialFetchOffset = initialFetchOffset;
+ this.committedOffset = initialFetchOffset - 1;
+ LOG.debug("Created OffsetEntry. {}", this);
+ }
+
+ public void add(KafkaSpoutMessageId msgId) { // O(Log N)
+ ackedMsgs.add(msgId);
+ }
+
+ /**
+ * @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit.
+ */
+ public OffsetAndMetadata findNextCommitOffset() {
+ boolean found = false;
+ long currOffset;
+ long nextCommitOffset = committedOffset;
+ KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata
+
+ for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap
+ if ((currOffset = currAckedMsg.offset()) == initialFetchOffset || currOffset == nextCommitOffset + 1) { // found the next offset to commit
+ found = true;
+ nextCommitMsg = currAckedMsg;
+ nextCommitOffset = currOffset;
+ LOG.trace("Found offset to commit [{}]. {}", currOffset, this);
+ } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search
+ LOG.debug("Non continuous offset found [{}]. It will be processed in a subsequent batch. {}", currOffset, this);
+ break;
+ } else {
+ LOG.debug("Unexpected offset found [{}]. {}", currOffset, this);
+ break;
+ }
+ }
+
+ OffsetAndMetadata nextCommitOffsetAndMetadata = null;
+ if (found) {
+ nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
+ LOG.debug("Offset to be committed next: [{}] {}", nextCommitOffsetAndMetadata.offset(), this);
+ } else {
+ LOG.debug("No offsets ready to commit. {}", this);
+ }
+ return nextCommitOffsetAndMetadata;
+ }
+
+ /**
+ * Marks an offset has committed. This method has side effects - it sets the internal state in such a way that future
+ * calls to {@link #findNextCommitOffset()} will return offsets greater than the offset specified, if any.
+ *
+ * @param committedOffset offset to be marked as committed
+ */
+ public void commit(OffsetAndMetadata committedOffset) {
+ if (committedOffset != null) {
+ final long numCommittedOffsets = committedOffset.offset() - this.committedOffset;
+ this.committedOffset = committedOffset.offset();
+ for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext(); ) {
+ if (iterator.next().offset() <= committedOffset.offset()) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+ numUncommittedOffsets-= numCommittedOffsets;
+ }
+ LOG.trace("Object state after update: {}, numUncommittedOffsets [{}]", this, numUncommittedOffsets);
+ }
+
+ public boolean isEmpty() {
+ return ackedMsgs.isEmpty();
+ }
+
+ @Override
+ public String toString() {
+ return "OffsetEntry{" +
+ "topic-partition=" + tp +
+ ", fetchOffset=" + initialFetchOffset +
+ ", committedOffset=" + committedOffset +
+ ", ackedMsgs=" + ackedMsgs +
+ '}';
+ }
+ }
+
+ // =========== Timer ===========
+
+ private class Timer {
+ private final long delay;
+ private final long period;
+ private final TimeUnit timeUnit;
+ private final long periodNanos;
+ private long start;
+
+ /**
+ * Creates a class that mimics a single threaded timer that expires periodically. If a call to {@link
+ * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was initiated or reset, this method returns
+ * true. Each time the method returns true the counter is reset. The timer starts with the specified time delay.
+ *
+ * @param delay the initial delay before the timer starts
+ * @param period the period between calls {@link #isExpiredResetOnTrue()}
+ * @param timeUnit the time unit of delay and period
+ */
+ public Timer(long delay, long period, TimeUnit timeUnit) {
+ this.delay = delay;
+ this.period = period;
+ this.timeUnit = timeUnit;
+
+ periodNanos = timeUnit.toNanos(period);
+ start = System.nanoTime() + timeUnit.toNanos(delay);
+ }
+
+ public long period() {
+ return period;
+ }
+
+ public long delay() {
+ return delay;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
+ /**
+ * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the
+ * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset
+ * (re-initiated) and a new cycle will start.
+ *
+ * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false
+ * otherwise.
+ */
+ public boolean isExpiredResetOnTrue() {
+ final boolean expired = System.nanoTime() - start > periodNanos;
+ if (expired) {
+ start = System.nanoTime();
+ }
+ return expired;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
new file mode 100644
index 0000000..d969f1f
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
+ */
+public class KafkaSpoutConfig<K, V> implements Serializable {
+ public static final long DEFAULT_POLL_TIMEOUT_MS = 2_000; // 2s
+ public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 15_000; // 15s
+ public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // Retry forever
+ public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000; // 10,000 records
+
+ // Kafka property names
+ public interface Consumer {
+ String GROUP_ID = "group.id";
+ String BOOTSTRAP_SERVERS = "bootstrap.servers";
+ String ENABLE_AUTO_COMMIT = "enable.auto.commit";
+ String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
+ String KEY_DESERIALIZER = "key.deserializer";
+ String VALUE_DESERIALIZER = "value.deserializer";
+ }
+
+ /**
+ * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will
+ * affect the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
+ * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. <br/>
+ * <ul>
+ * <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits</li>
+ * <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits</li>
+ * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any.
+ * If no offset has been committed, it behaves as EARLIEST.</li>
+ * <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any.
+ * If no offset has been committed, it behaves as LATEST.</li>
+ * </ul>
+ * */
+ public enum FirstPollOffsetStrategy {
+ EARLIEST,
+ LATEST,
+ UNCOMMITTED_EARLIEST,
+ UNCOMMITTED_LATEST }
+
+ /**
+ * Defines when to poll the next batch of records from Kafka. The choice of this parameter will affect throughput and the memory
+ * footprint of the Kafka spout. The allowed values are STREAM and BATCH. STREAM will likely have higher throughput and use more memory
+ * (it stores in memory the entire KafkaRecord,including data). BATCH will likely have less throughput but also use less memory.
+ * The BATCH behavior is similar to the behavior of the previous Kafka Spout. De default value is STREAM.
+ * <ul>
+ * <li>STREAM Every periodic call to nextTuple polls a new batch of records from Kafka as long as the maxUncommittedOffsets
+ * threshold has not yet been reached. When the threshold his reached, no more records are polled until enough offsets have been
+ * committed, such that the number of pending offsets is less than maxUncommittedOffsets. See {@link Builder#setMaxUncommittedOffsets(int)}
+ * </li>
+ * <li>BATCH Only polls a new batch of records from kafka once all the records that came in the previous poll have been acked.</li>
+ * </ul>
+ */
+ public enum PollStrategy {
+ STREAM,
+ BATCH
+ }
+
+ // Kafka consumer configuration
+ private final Map<String, Object> kafkaProps;
+ private final Deserializer<K> keyDeserializer;
+ private final Deserializer<V> valueDeserializer;
+ private final long pollTimeoutMs;
+
+ // Kafka spout configuration
+ private final long offsetCommitPeriodMs;
+ private final int maxRetries;
+ private final int maxUncommittedOffsets;
+ private final FirstPollOffsetStrategy firstPollOffsetStrategy;
+ private final PollStrategy pollStrategy;
+ private final KafkaSpoutStreams kafkaSpoutStreams;
+
+ private KafkaSpoutConfig(Builder<K,V> builder) {
+ this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
+ this.keyDeserializer = builder.keyDeserializer;
+ this.valueDeserializer = builder.valueDeserializer;
+ this.pollTimeoutMs = builder.pollTimeoutMs;
+ this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
+ this.maxRetries = builder.maxRetries;
+ this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+ this.pollStrategy = builder.pollStrategy;
+ this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
+ this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+ }
+
+ private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
+ // set defaults for properties not specified
+ if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
+ kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+ }
+ return kafkaProps;
+ }
+
+ public static class Builder<K,V> {
+ private Map<String, Object> kafkaProps;
+ private Deserializer<K> keyDeserializer;
+ private Deserializer<V> valueDeserializer;
+ private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
+ private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
+ private int maxRetries = DEFAULT_MAX_RETRIES;
+ private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+ private KafkaSpoutStreams kafkaSpoutStreams;
+ private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
+ private PollStrategy pollStrategy = PollStrategy.STREAM;
+
+ /***
+ * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
+ * The optional configuration can be specified using the set methods of this builder
+ * @param kafkaProps properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a>
+ * @param kafkaSpoutStreams streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream.
+ */
+ public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams) {
+ if (kafkaProps == null || kafkaProps.isEmpty()) {
+ throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required. " + kafkaProps);
+ }
+
+ if (kafkaSpoutStreams == null) {
+ throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit in the same stream.");
+ }
+ this.kafkaProps = kafkaProps;
+ this.kafkaSpoutStreams = kafkaSpoutStreams;
+ }
+
+ /**
+ * Specifying this key deserializer overrides the property key.deserializer
+ */
+ public Builder<K,V> setKeyDeserializer(Deserializer<K> keyDeserializer) {
+ this.keyDeserializer = keyDeserializer;
+ return this;
+ }
+
+ /**
+ * Specifying this value deserializer overrides the property value.deserializer
+ */
+ public Builder<K,V> setValueDeserializer(Deserializer<V> valueDeserializer) {
+ this.valueDeserializer = valueDeserializer;
+ return this;
+ }
+
+ /**
+ * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s
+ * @param pollTimeoutMs time in ms
+ */
+ public Builder<K,V> setPollTimeoutMs(long pollTimeoutMs) {
+ this.pollTimeoutMs = pollTimeoutMs;
+ return this;
+ }
+
+ /**
+ * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
+ * @param offsetCommitPeriodMs time in ms
+ */
+ public Builder<K,V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
+ this.offsetCommitPeriodMs = offsetCommitPeriodMs;
+ return this;
+ }
+
+ /**
+ * Defines the max number of retrials in case of tuple failure. The default is to retry forever, which means that
+ * no new records are committed until the previous polled records have been acked. This guarantees at once delivery of
+ * all the previously polled records.
+ * By specifying a finite value for maxRetries, the user decides to sacrifice guarantee of delivery for the previous
+ * polled records in favor of processing more records.
+ * @param maxRetries max number of retrials
+ */
+ public Builder<K,V> setMaxRetries(int maxRetries) {
+ this.maxRetries = maxRetries;
+ return this;
+ }
+
+ /**
+ * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
+ * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
+ * of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
+ * @param maxUncommittedOffsets max number of records that can be be pending commit
+ */
+ public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
+ this.maxUncommittedOffsets = maxUncommittedOffsets;
+ return this;
+ }
+
+ /**
+ * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start.
+ * Please refer to to the documentation in {@link FirstPollOffsetStrategy}
+ * @param firstPollOffsetStrategy Offset used by Kafka spout first poll
+ * */
+ public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
+ this.firstPollOffsetStrategy = firstPollOffsetStrategy;
+ return this;
+ }
+
+ /**
+ * Sets the strategy used by the the Kafka spout to decide when to poll the next batch of records from Kafka.
+ * Please refer to to the documentation in {@link PollStrategy}
+ * @param pollStrategy strategy used to decide when to poll
+ * */
+ public Builder<K, V> setPollStrategy(PollStrategy pollStrategy) {
+ this.pollStrategy = pollStrategy;
+ return this;
+ }
+
+ public KafkaSpoutConfig<K,V> build() {
+ return new KafkaSpoutConfig<>(this);
+ }
+ }
+
+ public Map<String, Object> getKafkaProps() {
+ return kafkaProps;
+ }
+
+ public Deserializer<K> getKeyDeserializer() {
+ return keyDeserializer;
+ }
+
+ public Deserializer<V> getValueDeserializer() {
+ return valueDeserializer;
+ }
+
+ public long getPollTimeoutMs() {
+ return pollTimeoutMs;
+ }
+
+ public long getOffsetsCommitPeriodMs() {
+ return offsetCommitPeriodMs;
+ }
+
+ public boolean isConsumerAutoCommitMode() {
+ return kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT) == null // default is true
+ || Boolean.valueOf((String)kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT));
+ }
+
+ public String getConsumerGroupId() {
+ return (String) kafkaProps.get(Consumer.GROUP_ID);
+ }
+
+ public List<String> getSubscribedTopics() {
+ return new ArrayList<>(kafkaSpoutStreams.getTopics());
+ }
+
+ public int getMaxTupleRetries() {
+ return maxRetries;
+ }
+
+ public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
+ return firstPollOffsetStrategy;
+ }
+
+ public KafkaSpoutStreams getKafkaSpoutStreams() {
+ return kafkaSpoutStreams;
+ }
+
+ public int getMaxUncommittedOffsets() {
+ return maxUncommittedOffsets;
+ }
+
+ public PollStrategy getPollStrategy() {
+ return pollStrategy;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaSpoutConfig{" +
+ "kafkaProps=" + kafkaProps +
+ ", keyDeserializer=" + keyDeserializer +
+ ", valueDeserializer=" + valueDeserializer +
+ ", topics=" + getSubscribedTopics() +
+ ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
+ ", pollTimeoutMs=" + pollTimeoutMs +
+ ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
+ ", maxRetries=" + maxRetries +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
new file mode 100644
index 0000000..0a6b126
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collections;
+import java.util.List;
+
+public class KafkaSpoutMessageId {
+ private transient TopicPartition topicPart;
+ private transient long offset;
+ private transient List<Object> tuple;
+ private transient int numFails = 0;
+
+ public KafkaSpoutMessageId(ConsumerRecord consumerRecord, List<Object> tuple) {
+ this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), tuple);
+ }
+
+ public KafkaSpoutMessageId(TopicPartition topicPart, long offset, List<Object> tuple) {
+ this.topicPart = topicPart;
+ this.offset = offset;
+ this.tuple = tuple;
+ }
+
+ public int partition() {
+ return topicPart.partition();
+ }
+
+ public String topic() {
+ return topicPart.topic();
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ public int numFails() {
+ return numFails;
+ }
+
+ public void incrementNumFails() {
+ ++numFails;
+ }
+
+ public TopicPartition getTopicPartition() {
+ return topicPart;
+ }
+
+ public List<Object> getTuple() {
+ return Collections.unmodifiableList(tuple);
+ }
+
+ public String getMetadata(Thread currThread) {
+ return "{" +
+ "topic-partition=" + topicPart +
+ ", offset=" + offset +
+ ", numFails=" + numFails +
+ ", thread='" + currThread.getName() + "'" +
+ '}';
+ }
+
+ @Override
+ public String toString() {
+ return "{" +
+ "topic-partition=" + topicPart +
+ ", offset=" + offset +
+ ", numFails=" + numFails +
+ ", tuple=" + tuple +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KafkaSpoutMessageId messageId = (KafkaSpoutMessageId) o;
+ if (offset != messageId.offset) {
+ return false;
+ }
+ return topicPart.equals(messageId.topicPart);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = topicPart.hashCode();
+ result = 31 * result + (int) (offset ^ (offset >>> 32));
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
new file mode 100644
index 0000000..43464a9
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+
+/**
+ * Represents the stream and output fields used by a topic
+ */
+public class KafkaSpoutStream implements Serializable {
+ private final Fields outputFields;
+ private final String streamId;
+ private final String topic;
+
+ /** Declare specified outputFields with default stream for the specified topic */
+ KafkaSpoutStream(Fields outputFields, String topic) {
+ this(outputFields, Utils.DEFAULT_STREAM_ID, topic);
+ }
+
+ /** Declare specified outputFields with specified stream for the specified topic */
+ KafkaSpoutStream(Fields outputFields, String streamId, String topic) {
+ this.outputFields = outputFields;
+ this.streamId = streamId;
+ this.topic = topic;
+ }
+
+ public Fields getOutputFields() {
+ return outputFields;
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaSpoutStream{" +
+ "outputFields=" + outputFields +
+ ", streamId='" + streamId + '\'' +
+ ", topic='" + topic + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
new file mode 100644
index 0000000..30215d1
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.OutputFieldsGetter;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+ protected static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStreams.class);
+
+ private final Map<String, KafkaSpoutStream> topicToStream;
+
+ private KafkaSpoutStreams(Builder builder) {
+ this.topicToStream = builder.topicToStream;
+ LOG.debug("Built {}", this);
+ }
+
+ /**
+ * @param topic the topic for which to get output fields
+ * @return the output fields declared
+ */
+ public Fields getOutputFields(String topic) {
+ if (topicToStream.containsKey(topic)) {
+ final Fields outputFields = topicToStream.get(topic).getOutputFields();
+ LOG.trace("Topic [{}] has output fields [{}]", topic, outputFields);
+ return outputFields;
+ }
+ throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic);
+ }
+
+ /**
+ * @param topic the topic to for which to get the stream id
+ * @return the id of the stream to where the tuples are emitted
+ */
+ public String getStreamId(String topic) {
+ if (topicToStream.containsKey(topic)) {
+ final String streamId = topicToStream.get(topic).getStreamId();
+ LOG.trace("Topic [{}] emitting in stream [{}]", topic, streamId);
+ return streamId;
+ }
+ throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic);
+ }
+
+ /**
+ * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream}
+ */
+ public List<String> getTopics() {
+ return new ArrayList<>(topicToStream.keySet());
+ }
+
+ void declareOutputFields(OutputFieldsDeclarer declarer) {
+ for (KafkaSpoutStream stream : topicToStream.values()) {
+ if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) {
+ declarer.declareStream(stream.getStreamId(), stream.getOutputFields());
+ LOG.debug("Declared " + stream);
+ }
+ }
+ }
+
+ void emit(SpoutOutputCollector collector, KafkaSpoutMessageId messageId) {
+ collector.emit(getStreamId(messageId.topic()), messageId.getTuple(), messageId);
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaSpoutStreams{" +
+ "topicToStream=" + topicToStream +
+ '}';
+ }
+
+ public static class Builder {
+ private final Map<String, KafkaSpoutStream> topicToStream = new HashMap<>();;
+
+ /**
+ * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified.
+ * All the topics will have the same stream id and output fields.
+ */
+ public Builder(Fields outputFields, String... topics) {
+ this(outputFields, Utils.DEFAULT_STREAM_ID, topics);
+ }
+
+ /**
+ * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified.
+ * All the topics will have the same stream id and output fields.
+ */
+ public Builder (Fields outputFields, String streamId, String... topics) {
+ for (String topic : topics) {
+ topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic));
+ }
+ }
+
+ /**
+ * Adds this stream to the state representing the streams associated with each topic
+ */
+ public Builder(KafkaSpoutStream stream) {
+ topicToStream.put(stream.getTopic(), stream);
+ }
+
+ /**
+ * Adds this stream to the state representing the streams associated with each topic
+ */
+ public Builder addStream(KafkaSpoutStream stream) {
+ topicToStream.put(stream.getTopic(), stream);
+ return this;
+ }
+
+ /**
+ * Please refer to javadoc in {@link #Builder(Fields, String...)}
+ */
+ public Builder addStream(Fields outputFields, String... topics) {
+ for (String topic : topics) {
+ topicToStream.put(topic, new KafkaSpoutStream(outputFields, topic));
+ }
+ return this;
+ }
+
+ /**
+ * Please refer to javadoc in {@link #Builder(Fields, String, String...)}
+ */
+ public Builder addStream(Fields outputFields, String streamId, String... topics) {
+ for (String topic : topics) {
+ topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic));
+ }
+ return this;
+ }
+
+ public KafkaSpoutStreams build() {
+ return new KafkaSpoutStreams(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
new file mode 100644
index 0000000..45aab48
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface KafkaSpoutTupleBuilder<K,V> extends Serializable {
+ List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
new file mode 100644
index 0000000..4fcc3ef
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.KafkaRecordTupleBuilder;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutStreams;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy.STREAM;
+
+public class KafkaSpoutTopologyMain {
+ private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"};
+ private static final String[] TOPICS = new String[]{"test","test1","test2"};
+
+
+ public static void main(String[] args) throws Exception {
+ if (args.length == 0) {
+ submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig());
+ } else {
+ submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), getConfig());
+ }
+ }
+
+ protected static void submitTopologyLocalCluster(StormTopology topology, Config config) throws InterruptedException {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", config, topology);
+ stopWaitingForInput();
+ }
+
+ protected static void submitTopologyRemoteCluster(String arg, StormTopology topology, Config config) throws Exception {
+ StormSubmitter.submitTopology(arg, config, topology);
+ }
+
+ private static void stopWaitingForInput() {
+ try {
+ System.out.println("PRESS ENTER TO STOP");
+ new BufferedReader(new InputStreamReader(System.in)).readLine();
+ System.exit(0);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected static Config getConfig() {
+ Config config = new Config();
+ config.setDebug(true);
+ return config;
+ }
+
+ public static StormTopology getTopolgyKafkaSpout() {
+ final TopologyBuilder tp = new TopologyBuilder();
+ tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams()), getTupleBuilder()), 1);
+ tp.setBolt("kafka_bolt", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
+ tp.setBolt("kafka_bolt_1", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
+ return tp.createTopology();
+ }
+
+ public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
+ return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams)
+ .setOffsetCommitPeriodMs(10_000)
+ .setFirstPollOffsetStrategy(EARLIEST)
+ .setPollStrategy(STREAM)
+ .setMaxUncommittedOffsets(250)
+ .build();
+ }
+
+ public static Map<String,Object> getKafkaConsumerProps() {
+ Map<String, Object> props = new HashMap<>();
+// props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
+ props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092");
+ props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
+ props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+ return props;
+ }
+
+ public static KafkaSpoutTupleBuilder<String,String> getTupleBuilder() {
+ return new KafkaRecordTupleBuilder<>();
+ }
+
+ public static KafkaSpoutStreams getKafkaSpoutStreams() {
+ final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
+ final Fields outputFields1 = new Fields("topic", "partition", "offset");
+ return new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent to test_stream
+ .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents topic test2 sent to test_stream
+ .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents topic test2 sent to test_stream2
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
new file mode 100644
index 0000000..c9ff9d5
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class KafkaTestBolt extends BaseRichBolt {
+ protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBolt.class);
+
+
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ LOG.debug("input = [" + input + "]");
+ collector.ack(input);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 43b7796..763c15f 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -118,26 +118,12 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.2.1</version>
- <!-- use provided scope, so users can pull in whichever scala version they choose -->
+ <artifactId>${kafka.artifact.id}</artifactId>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
- <version>0.8.2.1</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-solr/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-solr/pom.xml b/external/storm-solr/pom.xml
index d093ae8..ba79ddc 100644
--- a/external/storm-solr/pom.xml
+++ b/external/storm-solr/pom.xml
@@ -31,10 +31,10 @@
<developers>
<developer>
- <id>Hugo-Louro</id>
- <name>Hugo Louro</name>
- <email>hmclouro@gmail.com</email>
- </developer>
+ <id>hmcl</id>
+ <name>Hugo Louro</name>
+ <email>hmclouro@gmail.com</email>
+ </developer>
</developers>
<dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 12e5a9f..1a899b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,7 +82,7 @@
<roles>
<role>Committer</role>
</roles>
- <timezone />
+ <timezone/>
</developer>
<developer>
<id>afeng</id>
@@ -109,7 +109,7 @@
<roles>
<role>Committer</role>
</roles>
- <timezone />
+ <timezone/>
</developer>
<developer>
<id>jjackson</id>
@@ -249,6 +249,8 @@
<calcite.version>1.4.0-incubating</calcite.version>
<jackson.version>2.6.3</jackson.version>
<maven-surefire.version>2.18.1</maven-surefire.version>
+ <kafka.version>0.9.0.1</kafka.version>
+ <kafka.artifact.id>kafka_2.11</kafka.artifact.id>
<!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile -->
<java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
@@ -282,6 +284,7 @@
<module>external/storm-mongodb</module>
<module>examples/storm-starter</module>
<module>storm-clojure</module>
+ <module>external/storm-kafka-client</module>
</modules>
<dependencies>
@@ -673,14 +676,14 @@
<version>${ring-json.version}</version>
</dependency>
<dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlet</artifactId>
- <version>${jetty.version}</version>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>${jetty.version}</version>
</dependency>
<dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlets</artifactId>
- <version>${jetty.version}</version>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlets</artifactId>
+ <version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
@@ -831,7 +834,7 @@
<version>${thrift.version}</version>
<scope>compile</scope>
</dependency>
- <!-- used by examples/storm-starter -->
+ <!-- used by examples/storm-starter -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -839,14 +842,38 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.calcite</groupId>
- <artifactId>calcite-core</artifactId>
- <version>${calcite.version}</version>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ <version>${calcite.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>${kafka.artifact.id}</artifactId>
+ <version>${kafka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>uk.org.lidalia</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 7f0da6f..648640e 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -317,6 +317,20 @@
<include>README.*</include>
</includes>
</fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../../external/storm-kafka-client/target</directory>
+ <outputDirectory>external/storm-kafka-client</outputDirectory>
+ <includes>
+ <include>storm*jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/../../external/storm-kafka-client</directory>
+ <outputDirectory>external/storm-kafka-client</outputDirectory>
+ <includes>
+ <include>README.*</include>
+ </includes>
+ </fileSet>
<!-- $STORM_HOME/extlib -->
<fileSet>