You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/09 03:34:47 UTC
[1/2] storm git commit: STORM-3013: Keep KafkaConsumer open when
storm-kafka-client spout is deactivated, in order to keep metrics working
Repository: storm
Updated Branches:
refs/heads/master db86bade0 -> 6872adfc2
STORM-3013: Keep KafkaConsumer open when storm-kafka-client spout is deactivated, in order to keep metrics working
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/528ab30b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/528ab30b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/528ab30b
Branch: refs/heads/master
Commit: 528ab30b5c49dbeae37df4cb9a97c12ee8989c01
Parents: db86bad
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Mar 30 19:12:01 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Fri Jul 6 13:13:46 2018 +0200
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 49 +++++++++---------
.../kafka/spout/metrics/KafkaOffsetMetric.java | 9 ++--
.../kafka/spout/KafkaSpoutAbstractTest.java | 6 ++-
.../kafka/spout/KafkaSpoutReactivationTest.java | 53 ++++++++++++++------
.../kafka/spout/KafkaSpoutSingleTopicTest.java | 2 +-
...outTopologyDeployActivateDeactivateTest.java | 2 -
6 files changed, 73 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/528ab30b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 607b8a5..f5969af 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -24,7 +24,6 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
import com.google.common.annotations.VisibleForTesting;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -103,7 +102,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private transient Timer refreshAssignmentTimer;
private transient TopologyContext context;
private transient CommitMetadataManager commitMetadataManager;
- private transient KafkaOffsetMetric kafkaOffsetMetric;
+ private transient KafkaOffsetMetric<K, V> kafkaOffsetMetric;
private transient KafkaSpoutConsumerRebalanceListener rebalanceListener;
public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
@@ -145,6 +144,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
rebalanceListener = new KafkaSpoutConsumerRebalanceListener();
+ kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
+
tupleListener.open(conf, context);
if (canRegisterMetrics()) {
registerMetric();
@@ -155,7 +156,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private void registerMetric() {
LOG.info("Registering Spout Metrics");
- kafkaOffsetMetric = new KafkaOffsetMetric(() -> Collections.unmodifiableMap(offsetManagers), () -> kafkaConsumer);
+ kafkaOffsetMetric = new KafkaOffsetMetric<>(() -> Collections.unmodifiableMap(offsetManagers), () -> kafkaConsumer);
context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
}
@@ -186,7 +187,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
if (isAtLeastOnceProcessing()) {
- commitOffsetsForAckedTuples(new HashSet<>(partitions));
+ commitOffsetsForAckedTuples();
}
}
@@ -277,9 +278,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
if (isAtLeastOnceProcessing()) {
- commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+ commitOffsetsForAckedTuples();
} else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
- Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(kafkaConsumer.assignment());
kafkaConsumer.commitAsync(offsetsToCommit, null);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
@@ -367,7 +368,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
numPolledRecords);
if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is at-most-once.
- Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
createFetchedOffsetsMetadata(kafkaConsumer.assignment());
kafkaConsumer.commitSync(offsetsToCommit);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
@@ -497,15 +498,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
return offsetsToCommit;
}
-
- private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) {
- // Find offsets that are ready to be committed for every assigned topic partition
- final Map<TopicPartition, OffsetManager> assignedOffsetManagers = offsetManagers.entrySet().stream()
- .filter(entry -> assignedPartitions.contains(entry.getKey()))
- .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
-
+
+ private void commitOffsetsForAckedTuples() {
final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
- for (Map.Entry<TopicPartition, OffsetManager> tpOffset : assignedOffsetManagers.entrySet()) {
+ for (Map.Entry<TopicPartition, OffsetManager> tpOffset : offsetManagers.entrySet()) {
final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata());
if (nextCommitOffset != null) {
nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
@@ -542,7 +538,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
}
- final OffsetManager offsetManager = assignedOffsetManagers.get(tp);
+ final OffsetManager offsetManager = offsetManagers.get(tp);
offsetManager.commit(tpOffset.getValue());
LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", offsetManager.getNumUncommittedOffsets(), tp);
}
@@ -572,11 +568,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
if (!emitted.contains(msgId)) {
LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
- + "came from a topic-partition that this consumer group instance is no longer tracking "
- + "due to rebalance/partition reassignment. No action taken.", msgId);
+ + "came from a topic-partition that this consumer group instance is no longer tracking "
+ + "due to rebalance/partition reassignment. No action taken.", msgId);
} else {
Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
- + " This should never occur barring errors in the RetryService implementation or the spout code.");
+ + " This should never occur barring errors in the RetryService implementation or the spout code.");
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
emitted.remove(msgId);
}
@@ -618,7 +614,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void activate() {
try {
- kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
refreshAssignment();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
@@ -637,7 +632,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void deactivate() {
try {
- shutdown();
+ commitIfNecessary();
} catch (InterruptException e) {
throwKafkaConsumerInterruptedException();
}
@@ -652,11 +647,15 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
}
+ private void commitIfNecessary() {
+ if (isAtLeastOnceProcessing()) {
+ commitOffsetsForAckedTuples();
+ }
+ }
+
private void shutdown() {
try {
- if (isAtLeastOnceProcessing()) {
- commitOffsetsForAckedTuples(kafkaConsumer.assignment());
- }
+ commitIfNecessary();
} finally {
//remove resources
kafkaConsumer.close();
@@ -718,7 +717,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
@VisibleForTesting
- KafkaOffsetMetric getKafkaOffsetMetric() {
+ KafkaOffsetMetric<K, V> getKafkaOffsetMetric() {
return kafkaOffsetMetric;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/528ab30b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
index d6ed209..26eb135 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -51,13 +51,14 @@ import org.slf4j.LoggerFactory;
* topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout
* </p>
*/
-public class KafkaOffsetMetric implements IMetric {
+public class KafkaOffsetMetric<K, V> implements IMetric {
private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class);
private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
- private final Supplier<KafkaConsumer> consumerSupplier;
+ private final Supplier<KafkaConsumer<K,V>> consumerSupplier;
- public KafkaOffsetMetric(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier, Supplier<KafkaConsumer> consumerSupplier) {
+ public KafkaOffsetMetric(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier,
+ Supplier<KafkaConsumer<K, V>> consumerSupplier) {
this.offsetManagerSupplier = offsetManagerSupplier;
this.consumerSupplier = consumerSupplier;
}
@@ -66,7 +67,7 @@ public class KafkaOffsetMetric implements IMetric {
public Object getValueAndReset() {
Map<TopicPartition, OffsetManager> offsetManagers = offsetManagerSupplier.get();
- KafkaConsumer kafkaConsumer = consumerSupplier.get();
+ KafkaConsumer<K,V> kafkaConsumer = consumerSupplier.get();
if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) {
LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null.");
http://git-wip-us.apache.org/repos/asf/storm/blob/528ab30b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
index 98aed93..0692c91 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
@@ -59,7 +59,7 @@ public abstract class KafkaSpoutAbstractTest {
final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
final long commitOffsetPeriodMs;
- KafkaConsumer<String, String> consumerSpy;
+ private KafkaConsumer<String, String> consumerSpy;
KafkaSpout<String, String> spout;
@Captor
@@ -85,6 +85,10 @@ public abstract class KafkaSpoutAbstractTest {
simulatedTime = new Time.SimulatedTime();
}
+
+ protected KafkaConsumer<String, String> getKafkaConsumer() {
+ return consumerSpy;
+ }
private KafkaConsumerFactory<String, String> createConsumerFactory() {
http://git-wip-us.apache.org/repos/asf/storm/blob/528ab30b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
index 1b3a490..e763e00 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
@@ -17,7 +17,8 @@
package org.apache.storm.kafka.spout;
import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
@@ -35,6 +36,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.KafkaUnitExtension;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
@@ -64,30 +66,23 @@ public class KafkaSpoutReactivationTest {
private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
private final long commitOffsetPeriodMs = 2_000;
private KafkaConsumer<String, String> consumerSpy;
- private KafkaConsumer<String, String> postReactivationConsumerSpy;
private KafkaSpout<String, String> spout;
private final int maxPollRecords = 10;
- @BeforeEach
- public void setUp() {
+ public void prepareSpout(int messageCount, FirstPollOffsetStrategy firstPollOffsetStrategy) throws Exception {
KafkaSpoutConfig<String, String> spoutConfig =
SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitExtension.getKafkaUnit().getKafkaPort(),
SingleTopicKafkaSpoutConfiguration.TOPIC))
- .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
+ .setFirstPollOffsetStrategy(firstPollOffsetStrategy)
.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
.build();
KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactoryDefault<>();
this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
- this.postReactivationConsumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
KafkaConsumerFactory<String, String> consumerFactoryMock = mock(KafkaConsumerFactory.class);
when(consumerFactoryMock.createConsumer(any()))
- .thenReturn(consumerSpy)
- .thenReturn(postReactivationConsumerSpy);
+ .thenReturn(consumerSpy);
this.spout = new KafkaSpout<>(spoutConfig, consumerFactoryMock, new TopicAssigner());
- }
-
- private void prepareSpout(int messageCount) throws Exception {
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitExtension.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector);
}
@@ -100,11 +95,10 @@ public class KafkaSpoutReactivationTest {
return messageId.getValue();
}
- @Test
- public void testSpoutMustHandleReactivationGracefully() throws Exception {
+ private void doReactivationTest(FirstPollOffsetStrategy firstPollOffsetStrategy) throws Exception {
try (Time.SimulatedTime time = new Time.SimulatedTime()) {
int messageCount = maxPollRecords * 2;
- prepareSpout(messageCount);
+ prepareSpout(messageCount, firstPollOffsetStrategy);
//Emit and ack some tuples, ensure that some polled tuples remain cached in the spout by emitting less than maxPollRecords
int beforeReactivationEmits = maxPollRecords - 3;
@@ -118,6 +112,7 @@ public class KafkaSpoutReactivationTest {
//Cycle spout activation
spout.deactivate();
SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, commitCapture, beforeReactivationEmits - 1);
+ clearInvocations(consumerSpy);
//Tuples may be acked/failed after the spout deactivates, so we have to be able to handle this too
spout.ack(ackAfterDeactivateMessageId);
spout.activate();
@@ -133,7 +128,7 @@ public class KafkaSpoutReactivationTest {
spout.nextTuple();
//Verify that no more tuples are emitted and all tuples are committed
- SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(postReactivationConsumerSpy, commitCapture, messageCount);
+ SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, commitCapture, messageCount);
clearInvocations(collector);
spout.nextTuple();
@@ -142,4 +137,32 @@ public class KafkaSpoutReactivationTest {
}
+ @Test
+ public void testSpoutShouldResumeWhereItLeftOffWithUncommittedEarliestStrategy() throws Exception {
+ //With uncommitted earliest the spout should pick up where it left off when reactivating.
+ doReactivationTest(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
+ }
+
+ @Test
+ public void testSpoutShouldResumeWhereItLeftOffWithEarliestStrategy() throws Exception {
+ //With earliest, the spout should also resume where it left off, rather than restart at the earliest offset.
+ doReactivationTest(FirstPollOffsetStrategy.EARLIEST);
+ }
+
+ @Test
+ public void testSpoutMustHandleGettingMetricsWhileDeactivated() throws Exception {
+ //Storm will try to get metrics from the spout even while deactivated, the spout must be able to handle this
+ prepareSpout(10, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
+
+ for (int i = 0; i < 5; i++) {
+ KafkaSpoutMessageId msgId = emitOne();
+ spout.ack(msgId);
+ }
+
+ spout.deactivate();
+
+ Map<String, Long> offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+ assertThat(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC + "/totalSpoutLag"), is(5L));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/528ab30b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 1d877f5..84def11 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -101,7 +101,7 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest {
Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs);
spout.ack(failedIdReplayCaptor.getValue());
spout.nextTuple();
- verify(consumerSpy).commitSync(commitCapture.capture());
+ verify(getKafkaConsumer()).commitSync(commitCapture.capture());
Map<TopicPartition, OffsetAndMetadata> capturedCommit = commitCapture.getValue();
TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
http://git-wip-us.apache.org/repos/asf/storm/blob/528ab30b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
index 3276210..f45aaec 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
@@ -52,8 +52,6 @@ public class KafkaSpoutTopologyDeployActivateDeactivateTest extends KafkaSpoutAb
verifyAllMessagesCommitted(1);
- consumerSpy = createConsumerSpy();
-
spout.activate();
nextTuple_verifyEmitted_ack_resetCollector(1);
[2/2] storm git commit: Merge branch 'STORM-3013' of
https://github.com/srdo/storm into STORM-3013-merge
Posted by ka...@apache.org.
Merge branch 'STORM-3013' of https://github.com/srdo/storm into STORM-3013-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6872adfc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6872adfc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6872adfc
Branch: refs/heads/master
Commit: 6872adfc260a9417e4dadc0e1c867476ed45bce6
Parents: db86bad 528ab30
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Jul 9 12:33:37 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jul 9 12:33:37 2018 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 49 +++++++++---------
.../kafka/spout/metrics/KafkaOffsetMetric.java | 9 ++--
.../kafka/spout/KafkaSpoutAbstractTest.java | 6 ++-
.../kafka/spout/KafkaSpoutReactivationTest.java | 53 ++++++++++++++------
.../kafka/spout/KafkaSpoutSingleTopicTest.java | 2 +-
...outTopologyDeployActivateDeactivateTest.java | 2 -
6 files changed, 73 insertions(+), 48 deletions(-)
----------------------------------------------------------------------