You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2017/02/15 21:38:14 UTC
[1/3] storm git commit: STORM-2250: Kafka spout refactoring to
increase modularity and testability. Also support nanoseconds in Storm time
simulation
Repository: storm
Updated Branches:
refs/heads/master db4695d2d -> d235a0c1b
STORM-2250: Kafka spout refactoring to increase modularity and testability. Also support nanoseconds in Storm time simulation
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e75016c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e75016c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e75016c
Branch: refs/heads/master
Commit: 6e75016c45c602c874086dea26324ca413f0c141
Parents: db4695d
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Tue Feb 14 21:31:45 2017 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Tue Feb 14 21:31:45 2017 +0100
----------------------------------------------------------------------
external/storm-kafka-client/pom.xml | 9 +-
.../apache/storm/kafka/spout/KafkaSpout.java | 159 ++-------
.../kafka/spout/internal/OffsetManager.java | 157 +++++++++
.../storm/kafka/spout/internal/Timer.java | 7 +-
.../spout/ByTopicRecordTranslatorTest.java | 2 +-
.../spout/DefaultRecordTranslatorTest.java | 2 +-
.../storm/kafka/spout/KafkaSpoutConfigTest.java | 4 +-
.../kafka/spout/KafkaSpoutRebalanceTest.java | 82 ++---
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 334 ++++++++++---------
.../test/KafkaSpoutTopologyMainNamedTopics.java | 6 +-
.../KafkaSpoutTopologyMainWildcardTopics.java | 2 +-
pom.xml | 1 -
.../src/jvm/org/apache/storm/utils/Time.java | 146 +++++---
13 files changed, 524 insertions(+), 387 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index 97ed359..0fdb64d 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -77,7 +77,13 @@
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-all</artifactId>
+ <artifactId>hamcrest-core</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
@@ -90,7 +96,6 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
- <version>${log4j-over-slf4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index b96f3f9..f8a576c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -25,16 +25,13 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.NavigableSet;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -48,6 +45,7 @@ import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.kafka.spout.internal.Timer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -58,19 +56,19 @@ import org.slf4j.LoggerFactory;
public class KafkaSpout<K, V> extends BaseRichSpout {
private static final long serialVersionUID = 4151921085047987154L;
+ //Initial delay for the commit and subscription refresh timers
+ public static final long TIMER_DELAY_MS = 500;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
- private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
// Storm
protected SpoutOutputCollector collector;
// Kafka
private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
- private final KafkaConsumerFactory kafkaConsumerFactory;
+ private KafkaConsumerFactory kafkaConsumerFactory;
private transient KafkaConsumer<K, V> kafkaConsumer;
private transient boolean consumerAutoCommitMode;
-
// Bookkeeping
private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation
private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure
@@ -78,7 +76,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process.
// Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
- transient Map<TopicPartition, OffsetEntry> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate. Not used if it's AutoCommitMode
+ private transient Map<TopicPartition, OffsetManager> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if it's AutoCommitMode
@@ -87,13 +85,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
- this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
+ this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>());
}
//This constructor is here for testing
KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
- this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration
this.kafkaConsumerFactory = kafkaConsumerFactory;
+ this.kafkaSpoutConfig = kafkaSpoutConfig;
}
@Override
@@ -114,9 +112,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
retryService = kafkaSpoutConfig.getRetryService();
if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually
- commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
+ commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
}
- refreshSubscriptionTimer = new Timer(500, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
+ refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
acked = new HashMap<>();
emitted = new HashSet<>();
@@ -198,7 +196,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private void setAcked(TopicPartition tp, long fetchOffset) {
// If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
- acked.put(tp, new OffsetEntry(tp, fetchOffset));
+ acked.put(tp, new OffsetManager(tp, fetchOffset));
}
}
@@ -290,7 +288,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
} else {
- kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset
+ kafkaConsumer.seek(rtp, acked.get(rtp).getCommittedOffset() + 1); // Seek to last committed offset
}
}
}
@@ -347,7 +345,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private void commitOffsetsForAckedTuples() {
// Find offsets that are ready to be committed for every topic partition
final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
- for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) {
+ for (Map.Entry<TopicPartition, OffsetManager> tpOffset : acked.entrySet()) {
final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset();
if (nextCommitOffset != null) {
nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
@@ -360,9 +358,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
// Instead of iterating again, it would be possible to commit and update the state for each TopicPartition
// in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop
- for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) {
- final OffsetEntry offsetEntry = tpOffset.getValue();
- offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey()));
+ for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) {
+ //Update the OffsetManager for each committed partition, and update numUncommittedOffsets
+ final TopicPartition tp = tpOffset.getKey();
+ final OffsetManager offsetManager = acked.get(tp);
+ long numCommittedOffsets = offsetManager.commit(tpOffset.getValue());
+ numUncommittedOffsets -= numCommittedOffsets;
+ LOG.debug("[{}] uncommitted offsets across all topic partitions",
+ numUncommittedOffsets);
}
} else {
LOG.trace("No offsets to commit. {}", this);
@@ -483,127 +486,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private String getTopicsString() {
return kafkaSpoutConfig.getSubscription().getTopicsString();
}
+}
- // ======= Offsets Commit Management ==========
-
- private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> {
- public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
- return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1;
- }
- }
-
- /**
- * This class is not thread safe
- */
- class OffsetEntry {
- private final TopicPartition tp;
- private final long initialFetchOffset; /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
- * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
- private long committedOffset; // last offset committed to Kafka. Initially it is set to fetchOffset - 1
- private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR); // acked messages sorted by ascending order of offset
-
- public OffsetEntry(TopicPartition tp, long initialFetchOffset) {
- this.tp = tp;
- this.initialFetchOffset = initialFetchOffset;
- this.committedOffset = initialFetchOffset - 1;
- LOG.debug("Instantiated {}", this);
- }
-
- public void add(KafkaSpoutMessageId msgId) { // O(Log N)
- ackedMsgs.add(msgId);
- }
-
- /**
- * An offset is only committed when all records with lower offset have
- * been acked. This guarantees that all offsets smaller than the
- * committedOffset have been delivered.
- * @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit.
- */
- public OffsetAndMetadata findNextCommitOffset() {
- boolean found = false;
- long currOffset;
- long nextCommitOffset = committedOffset;
- KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata
-
- for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap
- if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) { // found the next offset to commit
- found = true;
- nextCommitMsg = currAckedMsg;
- nextCommitOffset = currOffset;
- } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search
- LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
- break;
- } else {
- //Received a redundant ack. Ignore and continue processing.
- LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]",
- tp, currOffset, committedOffset);
- }
- }
-
- OffsetAndMetadata nextCommitOffsetAndMetadata = null;
- if (found) {
- nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
- LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed",tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
- } else {
- LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp);
- }
- LOG.trace("{}", this);
- return nextCommitOffsetAndMetadata;
- }
-
- /**
- * Marks an offset has committed. This method has side effects - it sets the internal state in such a way that future
- * calls to {@link #findNextCommitOffset()} will return offsets greater than the offset specified, if any.
- *
- * @param committedOffset offset to be marked as committed
- */
- public void commit(OffsetAndMetadata committedOffset) {
- long numCommittedOffsets = 0;
- if (committedOffset != null) {
- final long oldCommittedOffset = this.committedOffset;
- numCommittedOffsets = committedOffset.offset() - this.committedOffset;
- this.committedOffset = committedOffset.offset();
- for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext(); ) {
- if (iterator.next().offset() <= committedOffset.offset()) {
- iterator.remove();
- } else {
- break;
- }
- }
- numUncommittedOffsets-= numCommittedOffsets;
- LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions",
- oldCommittedOffset + 1, this.committedOffset, numCommittedOffsets, tp, numUncommittedOffsets);
- } else {
- LOG.debug("Committed [{}] offsets for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions",
- numCommittedOffsets, tp, numUncommittedOffsets);
- }
- LOG.trace("{}", this);
- }
-
- long getCommittedOffset() {
- return committedOffset;
- }
-
- public boolean isEmpty() {
- return ackedMsgs.isEmpty();
- }
- public boolean contains(ConsumerRecord<K, V> record) {
- return contains(new KafkaSpoutMessageId(record));
- }
-
- public boolean contains(KafkaSpoutMessageId msgId) {
- return ackedMsgs.contains(msgId);
- }
- @Override
- public String toString() {
- return "OffsetEntry{" +
- "topic-partition=" + tp +
- ", fetchOffset=" + initialFetchOffset +
- ", committedOffset=" + committedOffset +
- ", ackedMsgs=" + ackedMsgs +
- '}';
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
new file mode 100755
index 0000000..4ce0471
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages acked and committed offsets for a TopicPartition. This class is not thread safe
+ */
+public class OffsetManager {
+
+ private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
+ private static final Logger LOG = LoggerFactory.getLogger(OffsetManager.class);
+ private final TopicPartition tp;
+ /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
+ * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
+ private final long initialFetchOffset;
+ // Last offset committed to Kafka. Initially it is set to fetchOffset - 1
+ private long committedOffset;
+ // Acked messages sorted by ascending order of offset
+ private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR);
+
+ public OffsetManager(TopicPartition tp, long initialFetchOffset) {
+ this.tp = tp;
+ this.initialFetchOffset = initialFetchOffset;
+ this.committedOffset = initialFetchOffset - 1;
+ LOG.debug("Instantiated {}", this);
+ }
+
+ public void add(KafkaSpoutMessageId msgId) { // O(Log N)
+ ackedMsgs.add(msgId);
+ }
+
+ /**
+ * An offset is only committed when all records with lower offset have been
+ * acked. This guarantees that all offsets smaller than the committedOffset
+ * have been delivered.
+ *
+ * @return the next OffsetAndMetadata to commit, or null if no offset is
+ * ready to commit.
+ */
+ public OffsetAndMetadata findNextCommitOffset() {
+ boolean found = false;
+ long currOffset;
+ long nextCommitOffset = committedOffset;
+ KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata
+
+ for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap
+ if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) { // found the next offset to commit
+ found = true;
+ nextCommitMsg = currAckedMsg;
+ nextCommitOffset = currOffset;
+ } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search
+ LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
+ break;
+ } else {
+ //Received a redundant ack. Ignore and continue processing.
+ LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]",
+ tp, currOffset, committedOffset);
+ }
+ }
+
+ OffsetAndMetadata nextCommitOffsetAndMetadata = null;
+ if (found) {
+ nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
+ LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
+ } else {
+ LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp);
+ }
+ LOG.trace("{}", this);
+ return nextCommitOffsetAndMetadata;
+ }
+
+ /**
+ * Marks an offset has committed. This method has side effects - it sets the
+ * internal state in such a way that future calls to
+ * {@link #findNextCommitOffset()} will return offsets greater than the
+ * offset specified, if any.
+ *
+ * @param committedOffset offset to be marked as committed
+ * @return Number of offsets committed in this commit
+ */
+ public long commit(OffsetAndMetadata committedOffset) {
+ long preCommitCommittedOffsets = this.committedOffset;
+ long numCommittedOffsets = committedOffset.offset() - this.committedOffset;
+ this.committedOffset = committedOffset.offset();
+ for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext();) {
+ if (iterator.next().offset() <= committedOffset.offset()) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+ LOG.trace("{}", this);
+
+ LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].",
+ preCommitCommittedOffsets + 1, this.committedOffset, numCommittedOffsets, tp);
+
+ return numCommittedOffsets;
+ }
+
+ public long getCommittedOffset() {
+ return committedOffset;
+ }
+
+ public boolean isEmpty() {
+ return ackedMsgs.isEmpty();
+ }
+
+ public boolean contains(ConsumerRecord record) {
+ return contains(new KafkaSpoutMessageId(record));
+ }
+
+ public boolean contains(KafkaSpoutMessageId msgId) {
+ return ackedMsgs.contains(msgId);
+ }
+
+ @Override
+ public String toString() {
+ return "OffsetManager{"
+ + "topic-partition=" + tp
+ + ", fetchOffset=" + initialFetchOffset
+ + ", committedOffset=" + committedOffset
+ + ", ackedMsgs=" + ackedMsgs
+ + '}';
+ }
+
+ private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> {
+
+ @Override
+ public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
+ return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
index d51104d..2a2e1cb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
@@ -18,6 +18,7 @@
package org.apache.storm.kafka.spout.internal;
import java.util.concurrent.TimeUnit;
+import org.apache.storm.utils.Time;
public class Timer {
private final long delay;
@@ -41,7 +42,7 @@ public class Timer {
this.timeUnit = timeUnit;
periodNanos = timeUnit.toNanos(period);
- start = System.nanoTime() + timeUnit.toNanos(delay);
+ start = Time.nanoTime() + timeUnit.toNanos(delay);
}
public long period() {
@@ -65,9 +66,9 @@ public class Timer {
* otherwise.
*/
public boolean isExpiredResetOnTrue() {
- final boolean expired = System.nanoTime() - start > periodNanos;
+ final boolean expired = Time.nanoTime() - start >= periodNanos;
if (expired) {
- start = System.nanoTime();
+ start = Time.nanoTime();
}
return expired;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
index fd53b15..1e4b43b 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.storm.kafka.spout;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.HashSet;
http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
index f4275e4..681953d 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.storm.kafka.spout;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
import java.util.Arrays;
http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index 08220dd..57e0120 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -17,7 +17,9 @@
*/
package org.apache.storm.kafka.spout;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.util.HashMap;
http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 68fd4a6..b882b67 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -44,6 +45,8 @@ import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -55,20 +58,18 @@ public class KafkaSpoutRebalanceTest {
@Captor
private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
- private TopologyContext contextMock;
- private SpoutOutputCollector collectorMock;
- private Map<String, Object> conf;
+ private final long offsetCommitPeriodMs = 2_000;
+ private final TopologyContext contextMock = mock(TopologyContext.class);
+ private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+ private final Map<String, Object> conf = new HashMap<>();
private KafkaConsumer<String, String> consumerMock;
- private KafkaConsumerFactory<String, String> consumerFactoryMock;
+ private KafkaConsumerFactory<String, String> consumerFactory;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- contextMock = mock(TopologyContext.class);
- collectorMock = mock(SpoutOutputCollector.class);
- conf = new HashMap<>();
consumerMock = mock(KafkaConsumer.class);
- consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock;
+ consumerFactory = (kafkaSpoutConfig) -> consumerMock;
}
//Returns messageIds in order of emission
@@ -93,9 +94,9 @@ public class KafkaSpoutRebalanceTest {
Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(firstPartitionRecords))
- .thenReturn(new ConsumerRecords(secondPartitionRecords))
- .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+ .thenReturn(new ConsumerRecords(firstPartitionRecords))
+ .thenReturn(new ConsumerRecords(secondPartitionRecords))
+ .thenReturn(new ConsumerRecords(Collections.emptyMap()));
//Emit the messages
spout.nextTuple();
@@ -109,7 +110,7 @@ public class KafkaSpoutRebalanceTest {
//Now rebalance
consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
-
+
List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
emittedMessageIds.add(messageIdForRevokedPartition.getValue());
emittedMessageIds.add(messageIdForAssignedPartition.getValue());
@@ -119,47 +120,48 @@ public class KafkaSpoutRebalanceTest {
@Test
public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
//Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10), consumerFactoryMock);
- String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
- TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
- TopicPartition assignedPartition = new TopicPartition(topic, 2);
-
- //Emit a message on each partition and revoke the first partition
- List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
-
- //Ack both emitted tuples
- spout.ack(emittedMessageIds.get(0));
- spout.ack(emittedMessageIds.get(1));
-
- //Ensure the commit timer has expired
- Thread.sleep(510);
-
- //Make the spout commit any acked tuples
- spout.nextTuple();
- //Verify that it only committed the message on the assigned partition
- verify(consumerMock).commitSync(commitCapture.capture());
-
- Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
- assertThat(commitCaptureMap, hasKey(assignedPartition));
- assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, this.offsetCommitPeriodMs), consumerFactory);
+ String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+ TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+ TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+ //Emit a message on each partition and revoke the first partition
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+
+ //Ack both emitted tuples
+ spout.ack(emittedMessageIds.get(0));
+ spout.ack(emittedMessageIds.get(1));
+
+ //Ensure the commit timer has expired
+ Time.advanceTime(offsetCommitPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+ //Make the spout commit any acked tuples
+ spout.nextTuple();
+ //Verify that it only committed the message on the assigned partition
+ verify(consumerMock, times(1)).commitSync(commitCapture.capture());
+
+ Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
+ assertThat(commitCaptureMap, hasKey(assignedPartition));
+ assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+ }
}
-
+
@Test
public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
//Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10, retryServiceMock), consumerFactoryMock);
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10, retryServiceMock), consumerFactory);
String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
TopicPartition assignedPartition = new TopicPartition(topic, 2);
-
+
//Emit a message on each partition and revoke the first partition
List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
-
+
//Fail both emitted tuples
spout.fail(emittedMessageIds.get(0));
spout.fail(emittedMessageIds.get(1));
-
+
//Check that only the tuple on the currently assigned partition is retried
verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
verify(retryServiceMock).schedule(emittedMessageIds.get(1));
http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index c5e4e31..fdc9734 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.storm.kafka.spout;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+
import info.batey.kafka.unit.KafkaUnitRule;
import kafka.producer.KeyedMessage;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -28,21 +30,39 @@ import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import static org.junit.Assert.*;
-
import java.util.Map;
import java.util.stream.IntStream;
-import static org.mockito.Mockito.*;
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
public class SingleTopicKafkaSpoutTest {
private class SpoutContext {
+
public KafkaSpout<String, String> spout;
public SpoutOutputCollector collector;
public SpoutContext(KafkaSpout<String, String> spout,
- SpoutOutputCollector collector) {
+ SpoutOutputCollector collector) {
this.spout = spout;
this.collector = collector;
}
@@ -51,190 +71,206 @@ public class SingleTopicKafkaSpoutTest {
@Rule
public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
- void populateTopicData(String topicName, int msgCount) {
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+ private final TopologyContext topologyContext = mock(TopologyContext.class);
+ private final Map<String, Object> conf = new HashMap<>();
+ private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+ private final long commitOffsetPeriodMs = 2_000;
+ private KafkaConsumer<String, String> consumerSpy;
+ private KafkaConsumerFactory<String, String> consumerFactory;
+ private KafkaSpout<String, String> spout;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ KafkaSpoutConfig spoutConfig = getKafkaSpoutConfig(kafkaUnitRule.getKafkaPort(), commitOffsetPeriodMs);
+ this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
+ this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
+ this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+ }
+
+ private void populateTopicData(String topicName, int msgCount) {
kafkaUnitRule.getKafkaUnit().createTopic(topicName);
IntStream.range(0, msgCount).forEach(value -> {
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
- topicName, Integer.toString(value),
- Integer.toString(value));
+ topicName, Integer.toString(value),
+ Integer.toString(value));
kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
});
}
- SpoutContext initializeSpout(int msgCount) {
+ private void initializeSpout(int msgCount) {
populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
- int kafkaPort = kafkaUnitRule.getKafkaPort();
-
- TopologyContext topology = mock(TopologyContext.class);
- SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
- Map conf = mock(Map.class);
-
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(kafkaPort));
- spout.open(conf, topology, collector);
+ spout.open(conf, topologyContext, collector);
spout.activate();
- return new SpoutContext(spout, collector);
}
+
/*
- * Asserts that the next possible offset to commit or the committed offset is the provided offset.
- * An offset that is ready to be committed is not guarenteed to be already committed.
+ * Asserts that commitSync has been called once,
+ * that there are only commits on one topic,
+ * and that the committed offset covers messageCount messages
*/
- private void assertOffsetCommitted(int offset, KafkaSpout.OffsetEntry entry) {
-
- boolean currentOffsetMatch = entry.getCommittedOffset() == offset;
- OffsetAndMetadata nextOffset = entry.findNextCommitOffset();
- boolean nextOffsetMatch = nextOffset != null && nextOffset.offset() == offset;
- assertTrue("Next offset: " +
- entry.findNextCommitOffset() +
- " OR current offset: " +
- entry.getCommittedOffset() +
- " must equal desired offset: " +
- offset,
- currentOffsetMatch | nextOffsetMatch);
+ private void verifyAllMessagesCommitted(long messageCount) {
+ verify(consumerSpy, times(1)).commitSync(commitCapture.capture());
+ Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
+ assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1));
+ OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue();
+ assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount - 1));
}
@Test
public void shouldContinueWithSlowDoubleAcks() throws Exception {
- int messageCount = 20;
- SpoutContext context = initializeSpout(messageCount);
-
- //play 1st tuple
- ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
- context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdToDoubleAck.capture());
- context.spout.ack(messageIdToDoubleAck.getValue());
-
- IntStream.range(0, messageCount/2).forEach(value -> {
- context.spout.nextTuple();
- });
-
- context.spout.ack(messageIdToDoubleAck.getValue());
-
- IntStream.range(0, messageCount).forEach(value -> {
- context.spout.nextTuple();
- });
-
- ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
-
- verify(context.collector, times(messageCount)).emit(
- eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ int messageCount = 20;
+ initializeSpout(messageCount);
+
+ //play 1st tuple
+ ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
+ spout.nextTuple();
+ verify(collector).emit(anyObject(), anyObject(), messageIdToDoubleAck.capture());
+ spout.ack(messageIdToDoubleAck.getValue());
+
+ //Emit some more messages
+ IntStream.range(0, messageCount / 2).forEach(value -> {
+ spout.nextTuple();
+ });
+
+ spout.ack(messageIdToDoubleAck.getValue());
+
+ //Emit any remaining messages
+ IntStream.range(0, messageCount).forEach(value -> {
+ spout.nextTuple();
+ });
+
+ //Verify that all messages are emitted, ack all the messages
+ ArgumentCaptor<Object> messageIds = ArgumentCaptor.forClass(Object.class);
+ verify(collector, times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
anyObject(),
- remainingIds.capture());
- remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+ messageIds.capture());
+ messageIds.getAllValues().iterator().forEachRemaining(spout::ack);
- context.spout.acked.values().forEach(item -> {
- assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
- });
+ Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+ //Commit offsets
+ spout.nextTuple();
+
+ verifyAllMessagesCommitted(messageCount);
+ }
}
@Test
public void shouldEmitAllMessages() throws Exception {
- int messageCount = 10;
- SpoutContext context = initializeSpout(messageCount);
-
-
- IntStream.range(0, messageCount).forEach(value -> {
- context.spout.nextTuple();
- ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
- verify(context.collector).emit(
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ int messageCount = 10;
+ initializeSpout(messageCount);
+
+ //Emit all messages and check that they are emitted. Ack the messages too
+ IntStream.range(0, messageCount).forEach(value -> {
+ spout.nextTuple();
+ ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
+ verify(collector).emit(
eq(SingleTopicKafkaSpoutConfiguration.STREAM),
eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
- Integer.toString(value),
- Integer.toString(value))),
- messageId.capture());
- context.spout.ack(messageId.getValue());
- reset(context.collector);
- });
-
- context.spout.acked.values().forEach(item -> {
- assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
- });
+ Integer.toString(value),
+ Integer.toString(value))),
+ messageId.capture());
+ spout.ack(messageId.getValue());
+ reset(collector);
+ });
+
+ Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+ //Commit offsets
+ spout.nextTuple();
+
+ verifyAllMessagesCommitted(messageCount);
+ }
}
@Test
public void shouldReplayInOrderFailedMessages() throws Exception {
- int messageCount = 10;
- SpoutContext context = initializeSpout(messageCount);
-
- //play and ack 1 tuple
- ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
- context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdAcked.capture());
- context.spout.ack(messageIdAcked.getValue());
- reset(context.collector);
-
- //play and fail 1 tuple
- ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
- context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdFailed.capture());
- context.spout.fail(messageIdFailed.getValue());
- reset(context.collector);
-
- //pause so that failed tuples will be retried
- Thread.sleep(200);
-
-
- //allow for some calls to nextTuple() to fail to emit a tuple
- IntStream.range(0, messageCount + 5).forEach(value -> {
- context.spout.nextTuple();
- });
-
- ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class);
-
- //1 message replayed, messageCount - 2 messages emitted for the first time
- verify(context.collector, times(messageCount - 1)).emit(
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ int messageCount = 10;
+ initializeSpout(messageCount);
+
+ //play and ack 1 tuple
+ ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
+ spout.nextTuple();
+ verify(collector).emit(anyObject(), anyObject(), messageIdAcked.capture());
+ spout.ack(messageIdAcked.getValue());
+ reset(collector);
+
+ //play and fail 1 tuple
+ ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
+ spout.nextTuple();
+ verify(collector).emit(anyObject(), anyObject(), messageIdFailed.capture());
+ spout.fail(messageIdFailed.getValue());
+ reset(collector);
+
+ //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait.
+ IntStream.range(0, messageCount).forEach(value -> {
+ spout.nextTuple();
+ });
+
+ ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class);
+ //All messages except the first acked message should have been emitted
+ verify(collector, times(messageCount - 1)).emit(
eq(SingleTopicKafkaSpoutConfiguration.STREAM),
anyObject(),
remainingMessageIds.capture());
- remainingMessageIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+ remainingMessageIds.getAllValues().iterator().forEachRemaining(spout::ack);
- context.spout.acked.values().forEach(item -> {
- assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
- });
+ Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+ //Commit offsets
+ spout.nextTuple();
+
+ verifyAllMessagesCommitted(messageCount);
+ }
}
@Test
public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception {
- int messageCount = 10;
- SpoutContext context = initializeSpout(messageCount);
-
-
- //play 1st tuple
- ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
- context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdToFail.capture());
- reset(context.collector);
-
- //play 2nd tuple
- ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class);
- context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdToAck.capture());
- reset(context.collector);
-
- //ack 2nd tuple
- context.spout.ack(messageIdToAck.getValue());
- //fail 1st tuple
- context.spout.fail(messageIdToFail.getValue());
-
- //pause so that failed tuples will be retried
- Thread.sleep(200);
-
- //allow for some calls to nextTuple() to fail to emit a tuple
- IntStream.range(0, messageCount + 5).forEach(value -> {
- context.spout.nextTuple();
- });
-
- ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
- //1 message replayed, messageCount - 2 messages emitted for the first time
- verify(context.collector, times(messageCount - 1)).emit(
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ int messageCount = 10;
+ initializeSpout(messageCount);
+
+ //play 1st tuple
+ ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
+ spout.nextTuple();
+ verify(collector).emit(anyObject(), anyObject(), messageIdToFail.capture());
+ reset(collector);
+
+ //play 2nd tuple
+ ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class);
+ spout.nextTuple();
+ verify(collector).emit(anyObject(), anyObject(), messageIdToAck.capture());
+ reset(collector);
+
+ //ack 2nd tuple
+ spout.ack(messageIdToAck.getValue());
+ //fail 1st tuple
+ spout.fail(messageIdToFail.getValue());
+
+ //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait.
+ IntStream.range(0, messageCount).forEach(value -> {
+ spout.nextTuple();
+ });
+
+ ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
+ //All messages except the first acked message should have been emitted
+ verify(collector, times(messageCount - 1)).emit(
eq(SingleTopicKafkaSpoutConfiguration.STREAM),
anyObject(),
remainingIds.capture());
- remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+ remainingIds.getAllValues().iterator().forEachRemaining(spout::ack);
- context.spout.acked.values().forEach(item -> {
- assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
- });
+ Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+ //Commit offsets
+ spout.nextTuple();
+
+ verifyAllMessagesCommitted(messageCount);
+ }
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index 2aeeb95..e305c8a 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -50,9 +50,9 @@ public class KafkaSpoutTopologyMainNamedTopics {
protected void runMain(String[] args) throws Exception {
if (args.length == 0) {
- submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig());
+ submitTopologyLocalCluster(getTopologyKafkaSpout(), getConfig());
} else {
- submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), getConfig());
+ submitTopologyRemoteCluster(args[0], getTopologyKafkaSpout(), getConfig());
}
}
@@ -82,7 +82,7 @@ public class KafkaSpoutTopologyMainNamedTopics {
return config;
}
- protected StormTopology getTopolgyKafkaSpout() {
+ protected StormTopology getTopologyKafkaSpout() {
final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);
tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt())
http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
index d0376e6..f811c7a 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
@@ -37,7 +37,7 @@ public class KafkaSpoutTopologyMainWildcardTopics extends KafkaSpoutTopologyMain
new KafkaSpoutTopologyMainWildcardTopics().runMain(args);
}
- protected StormTopology getTopolgyKafkaSpout() {
+ protected StormTopology getTopologyKafkaSpout() {
final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1);
tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 13c4c35..6d3543e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -359,7 +359,6 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/6e75016c/storm-core/src/jvm/org/apache/storm/utils/Time.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Time.java b/storm-core/src/jvm/org/apache/storm/utils/Time.java
index c5c6b6a..0401829 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Time.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Time.java
@@ -24,14 +24,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+/**
+ * This class implements time simulation support. When time simulation is enabled, methods on this class will use fixed time.
+ * When time simulation is disabled, methods will pass through to relevant java.lang.System/java.lang.Thread calls.
+ * Methods using units higher than nanoseconds will pass through to System.currentTimeMillis(). Methods supporting nanoseconds will pass through to System.nanoTime().
+ */
public class Time {
private static final Logger LOG = LoggerFactory.getLogger(Time.class);
private static AtomicBoolean simulating = new AtomicBoolean(false);
- private static AtomicLong autoAdvanceOnSleep = new AtomicLong(0);
- private static volatile Map<Thread, AtomicLong> threadSleepTimes;
+ private static AtomicLong autoAdvanceNanosOnSleep = new AtomicLong(0);
+ private static volatile Map<Thread, AtomicLong> threadSleepTimesNanos;
private static final Object sleepTimesLock = new Object();
- private static AtomicLong simulatedCurrTimeMs;
+ private static AtomicLong simulatedCurrTimeNanos;
public static class SimulatedTime implements AutoCloseable {
@@ -39,13 +43,13 @@ public class Time {
this(null);
}
- public SimulatedTime(Number ms) {
+ public SimulatedTime(Number advanceTimeMs) {
synchronized(Time.sleepTimesLock) {
Time.simulating.set(true);
- Time.simulatedCurrTimeMs = new AtomicLong(0);
- Time.threadSleepTimes = new ConcurrentHashMap<>();
- if (ms != null) {
- Time.autoAdvanceOnSleep.set(ms.longValue());
+ Time.simulatedCurrTimeNanos = new AtomicLong(0);
+ Time.threadSleepTimesNanos = new ConcurrentHashMap<>();
+ if (advanceTimeMs != null) {
+ Time.autoAdvanceNanosOnSleep.set(millisToNanos(advanceTimeMs.longValue()));
}
LOG.warn("AutoCloseable Simulated Time Starting...");
}
@@ -55,8 +59,8 @@ public class Time {
public void close() {
synchronized(Time.sleepTimesLock) {
Time.simulating.set(false);
- Time.autoAdvanceOnSleep.set(0);
- Time.threadSleepTimes = null;
+ Time.autoAdvanceNanosOnSleep.set(0);
+ Time.threadSleepTimesNanos = null;
LOG.warn("AutoCloseable Simulated Time Ending...");
}
}
@@ -66,8 +70,8 @@ public class Time {
public static void startSimulating() {
synchronized(Time.sleepTimesLock) {
Time.simulating.set(true);
- Time.simulatedCurrTimeMs = new AtomicLong(0);
- Time.threadSleepTimes = new ConcurrentHashMap<>();
+ Time.simulatedCurrTimeNanos = new AtomicLong(0);
+ Time.threadSleepTimesNanos = new ConcurrentHashMap<>();
LOG.warn("Simulated Time Starting...");
}
}
@@ -76,8 +80,8 @@ public class Time {
public static void stopSimulating() {
synchronized(Time.sleepTimesLock) {
Time.simulating.set(false);
- Time.autoAdvanceOnSleep.set(0);
- Time.threadSleepTimes = null;
+ Time.autoAdvanceNanosOnSleep.set(0);
+ Time.threadSleepTimesNanos = null;
LOG.warn("Simulated Time Ending...");
}
}
@@ -88,44 +92,66 @@ public class Time {
public static void sleepUntil(long targetTimeMs) throws InterruptedException {
if(simulating.get()) {
- try {
- synchronized(sleepTimesLock) {
- if (threadSleepTimes == null) {
+ simulatedSleepUntilNanos(millisToNanos(targetTimeMs));
+ } else {
+ long sleepTimeMs = targetTimeMs - currentTimeMillis();
+ if(sleepTimeMs>0) {
+ Thread.sleep(sleepTimeMs);
+ }
+ }
+ }
+
+ public static void sleepUntilNanos(long targetTimeNanos) throws InterruptedException {
+ if(simulating.get()) {
+ simulatedSleepUntilNanos(targetTimeNanos);
+ } else {
+ long sleepTimeNanos = targetTimeNanos-nanoTime();
+ long sleepTimeMs = nanosToMillis(sleepTimeNanos);
+ int sleepTimeNanosSansMs = (int)(sleepTimeNanos%1_000_000);
+ if(sleepTimeNanos>0) {
+ Thread.sleep(sleepTimeMs, sleepTimeNanosSansMs);
+ }
+ }
+ }
+
+ private static void simulatedSleepUntilNanos(long targetTimeNanos) throws InterruptedException {
+ try {
+ synchronized (sleepTimesLock) {
+ if (threadSleepTimesNanos == null) {
+ LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
+ throw new InterruptedException();
+ }
+ threadSleepTimesNanos.put(Thread.currentThread(), new AtomicLong(targetTimeNanos));
+ }
+ while (simulatedCurrTimeNanos.get() < targetTimeNanos) {
+ synchronized (sleepTimesLock) {
+ if (threadSleepTimesNanos == null) {
LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
throw new InterruptedException();
}
- threadSleepTimes.put(Thread.currentThread(), new AtomicLong(targetTimeMs));
}
- while(simulatedCurrTimeMs.get() < targetTimeMs) {
- synchronized(sleepTimesLock) {
- if (threadSleepTimes == null) {
- LOG.debug("{} is still sleeping after simulated time disabled.", Thread.currentThread(), new RuntimeException("STACK TRACE"));
- throw new InterruptedException();
- }
- }
- long autoAdvance = autoAdvanceOnSleep.get();
- if (autoAdvance > 0) {
- advanceTime(autoAdvance);
- }
- Thread.sleep(10);
+ long autoAdvance = autoAdvanceNanosOnSleep.get();
+ if (autoAdvance > 0) {
+ advanceTimeNanos(autoAdvance);
}
- } finally {
- synchronized(sleepTimesLock) {
- if (simulating.get() && threadSleepTimes != null) {
- threadSleepTimes.remove(Thread.currentThread());
- }
+ Thread.sleep(10);
+ }
+ } finally {
+ synchronized (sleepTimesLock) {
+ if (simulating.get() && threadSleepTimesNanos != null) {
+ threadSleepTimesNanos.remove(Thread.currentThread());
}
}
- } else {
- long sleepTime = targetTimeMs-currentTimeMillis();
- if(sleepTime>0)
- Thread.sleep(sleepTime);
}
}
public static void sleep(long ms) throws InterruptedException {
sleepUntil(currentTimeMillis()+ms);
}
+
+ public static void sleepNanos(long nanos) throws InterruptedException {
+ sleepUntilNanos(nanoTime() + nanos);
+ }
public static void sleepSecs (long secs) throws InterruptedException {
if (secs > 0) {
@@ -133,14 +159,30 @@ public class Time {
}
}
+ public static long nanoTime() {
+ if (simulating.get()) {
+ return simulatedCurrTimeNanos.get();
+ } else {
+ return System.nanoTime();
+ }
+ }
+
public static long currentTimeMillis() {
if(simulating.get()) {
- return simulatedCurrTimeMs.get();
+ return nanosToMillis(simulatedCurrTimeNanos.get());
} else {
return System.currentTimeMillis();
}
}
+ public static long nanosToMillis(long nanos) {
+ return nanos/1_000_000;
+ }
+
+ public static long millisToNanos(long millis) {
+ return millis*1_000_000;
+ }
+
public static long secsToMillis (int secs) {
return 1000*(long) secs;
}
@@ -162,9 +204,17 @@ public class Time {
}
public static void advanceTime(long ms) {
- if (!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode");
- if (ms < 0) throw new IllegalArgumentException("advanceTime only accepts positive time as an argument");
- long newTime = simulatedCurrTimeMs.addAndGet(ms);
+ advanceTimeNanos(millisToNanos(ms));
+ }
+
+ public static void advanceTimeNanos(long nanos) {
+ if (!simulating.get()) {
+ throw new IllegalStateException("Cannot simulate time unless in simulation mode");
+ }
+ if (nanos < 0) {
+ throw new IllegalArgumentException("advanceTime only accepts positive time as an argument");
+ }
+ long newTime = simulatedCurrTimeNanos.addAndGet(nanos);
LOG.debug("Advanced simulated time to {}", newTime);
}
@@ -173,11 +223,13 @@ public class Time {
}
public static boolean isThreadWaiting(Thread t) {
- if(!simulating.get()) throw new IllegalStateException("Must be in simulation mode");
+ if(!simulating.get()) {
+ throw new IllegalStateException("Must be in simulation mode");
+ }
AtomicLong time;
synchronized(sleepTimesLock) {
- time = threadSleepTimes.get(t);
+ time = threadSleepTimesNanos.get(t);
}
- return !t.isAlive() || time!=null && currentTimeMillis() < time.longValue();
+ return !t.isAlive() || time!=null && nanoTime() < time.longValue();
}
}
[2/3] storm git commit: Merge branch 'STORM-2250' of
github.com:srdo/storm
Posted by pt...@apache.org.
Merge branch 'STORM-2250' of github.com:srdo/storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b8e458f4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b8e458f4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b8e458f4
Branch: refs/heads/master
Commit: b8e458f4a0eb5c79e6099b52a7acf9268b7b655e
Parents: 560ff86 6e75016
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Feb 15 16:33:21 2017 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Feb 15 16:33:21 2017 -0500
----------------------------------------------------------------------
CHANGELOG.md | 2 +
LICENSE | 102 +++++-
docs/storm-kafka-client.md | 22 ++
external/storm-kafka-client/pom.xml | 9 +-
.../apache/storm/kafka/spout/KafkaSpout.java | 200 +++--------
.../kafka/spout/internal/OffsetManager.java | 157 +++++++++
.../storm/kafka/spout/internal/Timer.java | 7 +-
.../spout/ByTopicRecordTranslatorTest.java | 2 +-
.../spout/DefaultRecordTranslatorTest.java | 2 +-
.../storm/kafka/spout/KafkaSpoutConfigTest.java | 4 +-
.../kafka/spout/KafkaSpoutRebalanceTest.java | 82 ++---
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 334 ++++++++++---------
.../test/KafkaSpoutTopologyMainNamedTopics.java | 6 +-
.../KafkaSpoutTopologyMainWildcardTopics.java | 2 +-
pom.xml | 1 -
.../apache/storm/blobstore/BlobStoreUtils.java | 11 +-
.../src/jvm/org/apache/storm/utils/Time.java | 146 +++++---
storm-core/src/ui/public/css/style.css | 7 +
storm-core/src/ui/public/flux.html | 157 +++++++++
storm-core/src/ui/public/images/bolt.png | Bin 0 -> 6019 bytes
storm-core/src/ui/public/images/flux.png | Bin 0 -> 5328 bytes
storm-core/src/ui/public/images/spout.png | Bin 0 -> 5255 bytes
storm-core/src/ui/public/index.html | 7 +-
storm-core/src/ui/public/js/cytoscape-dagre.js | 192 +++++++++++
storm-core/src/ui/public/js/cytoscape.min.js | 63 ++++
storm-core/src/ui/public/js/dagre.min.js | 6 +
storm-core/src/ui/public/js/esprima.min.js | 2 +
storm-core/src/ui/public/js/js-yaml.min.js | 3 +
28 files changed, 1122 insertions(+), 404 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: add STORM-2250 to changelog
Posted by pt...@apache.org.
add STORM-2250 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d235a0c1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d235a0c1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d235a0c1
Branch: refs/heads/master
Commit: d235a0c1b49a4ead966d2288e81058495d2ada44
Parents: b8e458f
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Feb 15 16:34:13 2017 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Feb 15 16:34:13 2017 -0500
----------------------------------------------------------------------
CHANGELOG.md | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d235a0c1/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8f731bd..efc3490 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
-## 2.0.0
+\ufeff## 2.0.0
+ * STORM-2250: Kafka Spout Refactoring to Increase Modularity and Testability
* STORM-2346: Files with unapproved licenses: download-rc-directory.sh verify-release-file.sh
* STORM-2350: Storm-HDFS's listFilesByModificationTime is broken
* STORM-1961: Stream api for storm core use cases