You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/09 03:34:47 UTC

[1/2] storm git commit: STORM-3013: Keep KafkaConsumer open when storm-kafka-client spout is deactivated, in order to keep metrics working

Repository: storm
Updated Branches:
  refs/heads/master db86bade0 -> 6872adfc2


STORM-3013: Keep KafkaConsumer open when storm-kafka-client spout is deactivated, in order to keep metrics working


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/528ab30b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/528ab30b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/528ab30b

Branch: refs/heads/master
Commit: 528ab30b5c49dbeae37df4cb9a97c12ee8989c01
Parents: db86bad
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Mar 30 19:12:01 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Fri Jul 6 13:13:46 2018 +0200

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 49 +++++++++---------
 .../kafka/spout/metrics/KafkaOffsetMetric.java  |  9 ++--
 .../kafka/spout/KafkaSpoutAbstractTest.java     |  6 ++-
 .../kafka/spout/KafkaSpoutReactivationTest.java | 53 ++++++++++++++------
 .../kafka/spout/KafkaSpoutSingleTopicTest.java  |  2 +-
 ...outTopologyDeployActivateDeactivateTest.java |  2 -
 6 files changed, 73 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/528ab30b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 607b8a5..f5969af 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -24,7 +24,6 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -103,7 +102,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient Timer refreshAssignmentTimer;
     private transient TopologyContext context;
     private transient CommitMetadataManager commitMetadataManager;
-    private transient KafkaOffsetMetric kafkaOffsetMetric;
+    private transient KafkaOffsetMetric<K, V> kafkaOffsetMetric;
     private transient KafkaSpoutConsumerRebalanceListener rebalanceListener;
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
@@ -145,6 +144,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         rebalanceListener = new KafkaSpoutConsumerRebalanceListener();
 
+        kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
+
         tupleListener.open(conf, context);
         if (canRegisterMetrics()) {
             registerMetric();
@@ -155,7 +156,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private void registerMetric() {
         LOG.info("Registering Spout Metrics");
-        kafkaOffsetMetric = new KafkaOffsetMetric(() -> Collections.unmodifiableMap(offsetManagers), () -> kafkaConsumer);
+        kafkaOffsetMetric = new KafkaOffsetMetric<>(() -> Collections.unmodifiableMap(offsetManagers), () -> kafkaConsumer);
         context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
     }
 
@@ -186,7 +187,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
 
             if (isAtLeastOnceProcessing()) {
-                commitOffsetsForAckedTuples(new HashSet<>(partitions));
+                commitOffsetsForAckedTuples();
             }
         }
 
@@ -277,9 +278,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
             if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
                 if (isAtLeastOnceProcessing()) {
-                    commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+                    commitOffsetsForAckedTuples();
                 } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
+                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
                         createFetchedOffsetsMetadata(kafkaConsumer.assignment());
                     kafkaConsumer.commitAsync(offsetsToCommit, null);
                     LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
@@ -367,7 +368,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 numPolledRecords);
             if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                 //Commit polled records immediately to ensure delivery is at-most-once.
-                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
+                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
                     createFetchedOffsetsMetadata(kafkaConsumer.assignment());
                 kafkaConsumer.commitSync(offsetsToCommit);
                 LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
@@ -497,15 +498,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
         return offsetsToCommit;
     }
-
-    private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) {
-        // Find offsets that are ready to be committed for every assigned topic partition
-        final Map<TopicPartition, OffsetManager> assignedOffsetManagers = offsetManagers.entrySet().stream()
-            .filter(entry -> assignedPartitions.contains(entry.getKey()))
-            .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
-
+    
+    private void commitOffsetsForAckedTuples() {
         final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
-        for (Map.Entry<TopicPartition, OffsetManager> tpOffset : assignedOffsetManagers.entrySet()) {
+        for (Map.Entry<TopicPartition, OffsetManager> tpOffset : offsetManagers.entrySet()) {
             final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata());
             if (nextCommitOffset != null) {
                 nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
@@ -542,7 +538,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     }
                 }
 
-                final OffsetManager offsetManager = assignedOffsetManagers.get(tp);
+                final OffsetManager offsetManager = offsetManagers.get(tp);
                 offsetManager.commit(tpOffset.getValue());
                 LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", offsetManager.getNumUncommittedOffsets(), tp);
             }
@@ -572,11 +568,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         if (!emitted.contains(msgId)) {
             LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
-                        + "came from a topic-partition that this consumer group instance is no longer tracking "
-                        + "due to rebalance/partition reassignment. No action taken.", msgId);
+                + "came from a topic-partition that this consumer group instance is no longer tracking "
+                + "due to rebalance/partition reassignment. No action taken.", msgId);
         } else {
             Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
-                        + " This should never occur barring errors in the RetryService implementation or the spout code.");
+                + " This should never occur barring errors in the RetryService implementation or the spout code.");
             offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
             emitted.remove(msgId);
         }
@@ -618,7 +614,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void activate() {
         try {
-            kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
             refreshAssignment();
         } catch (InterruptException e) {
             throwKafkaConsumerInterruptedException();
@@ -637,7 +632,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void deactivate() {
         try {
-            shutdown();
+            commitIfNecessary();
         } catch (InterruptException e) {
             throwKafkaConsumerInterruptedException();
         }
@@ -652,11 +647,15 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
     }
 
+    private void commitIfNecessary() {
+        if (isAtLeastOnceProcessing()) {
+            commitOffsetsForAckedTuples();
+        }
+    }
+
     private void shutdown() {
         try {
-            if (isAtLeastOnceProcessing()) {
-                commitOffsetsForAckedTuples(kafkaConsumer.assignment());
-            }
+            commitIfNecessary();
         } finally {
             //remove resources
             kafkaConsumer.close();
@@ -718,7 +717,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     @VisibleForTesting
-    KafkaOffsetMetric getKafkaOffsetMetric() {
+    KafkaOffsetMetric<K, V> getKafkaOffsetMetric() {
         return kafkaOffsetMetric;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/528ab30b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
index d6ed209..26eb135 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -51,13 +51,14 @@ import org.slf4j.LoggerFactory;
  * topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout
  * </p>
  */
-public class KafkaOffsetMetric implements IMetric {
+public class KafkaOffsetMetric<K, V> implements IMetric {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class);
     private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
-    private final Supplier<KafkaConsumer> consumerSupplier;
+    private final Supplier<KafkaConsumer<K,V>> consumerSupplier;
 
-    public KafkaOffsetMetric(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier, Supplier<KafkaConsumer> consumerSupplier) {
+    public KafkaOffsetMetric(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier,
+        Supplier<KafkaConsumer<K, V>> consumerSupplier) {
         this.offsetManagerSupplier = offsetManagerSupplier;
         this.consumerSupplier = consumerSupplier;
     }
@@ -66,7 +67,7 @@ public class KafkaOffsetMetric implements IMetric {
     public Object getValueAndReset() {
 
         Map<TopicPartition, OffsetManager> offsetManagers = offsetManagerSupplier.get();
-        KafkaConsumer kafkaConsumer = consumerSupplier.get();
+        KafkaConsumer<K,V> kafkaConsumer = consumerSupplier.get();
 
         if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) {
             LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null.");

http://git-wip-us.apache.org/repos/asf/storm/blob/528ab30b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
index 98aed93..0692c91 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
@@ -59,7 +59,7 @@ public abstract class KafkaSpoutAbstractTest {
     final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
     final long commitOffsetPeriodMs;
 
-    KafkaConsumer<String, String> consumerSpy;
+    private KafkaConsumer<String, String> consumerSpy;
     KafkaSpout<String, String> spout;
 
     @Captor
@@ -85,6 +85,10 @@ public abstract class KafkaSpoutAbstractTest {
 
         simulatedTime = new Time.SimulatedTime();
     }
+    
+    protected KafkaConsumer<String, String> getKafkaConsumer() {
+        return consumerSpy;
+    }
 
     private KafkaConsumerFactory<String, String> createConsumerFactory() {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/528ab30b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
index 1b3a490..e763e00 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
@@ -17,7 +17,8 @@
 package org.apache.storm.kafka.spout;
 
 import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -35,6 +36,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.KafkaUnitExtension;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
@@ -64,30 +66,23 @@ public class KafkaSpoutReactivationTest {
     private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
     private final long commitOffsetPeriodMs = 2_000;
     private KafkaConsumer<String, String> consumerSpy;
-    private KafkaConsumer<String, String> postReactivationConsumerSpy;
     private KafkaSpout<String, String> spout;
     private final int maxPollRecords = 10;
 
-    @BeforeEach
-    public void setUp() {
+    public void prepareSpout(int messageCount, FirstPollOffsetStrategy firstPollOffsetStrategy) throws Exception {
         KafkaSpoutConfig<String, String> spoutConfig =
             SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitExtension.getKafkaUnit().getKafkaPort(),
                     SingleTopicKafkaSpoutConfiguration.TOPIC))
-                .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
+                .setFirstPollOffsetStrategy(firstPollOffsetStrategy)
                 .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
                 .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
                 .build();
         KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactoryDefault<>();
         this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
-        this.postReactivationConsumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
         KafkaConsumerFactory<String, String> consumerFactoryMock = mock(KafkaConsumerFactory.class);
         when(consumerFactoryMock.createConsumer(any()))
-            .thenReturn(consumerSpy)
-            .thenReturn(postReactivationConsumerSpy);
+            .thenReturn(consumerSpy);
         this.spout = new KafkaSpout<>(spoutConfig, consumerFactoryMock, new TopicAssigner());
-    }
-
-    private void prepareSpout(int messageCount) throws Exception {
         SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitExtension.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
         SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector);
     }
@@ -100,11 +95,10 @@ public class KafkaSpoutReactivationTest {
         return messageId.getValue();
     }
 
-    @Test
-    public void testSpoutMustHandleReactivationGracefully() throws Exception {
+    private void doReactivationTest(FirstPollOffsetStrategy firstPollOffsetStrategy) throws Exception {
         try (Time.SimulatedTime time = new Time.SimulatedTime()) {
             int messageCount = maxPollRecords * 2;
-            prepareSpout(messageCount);
+            prepareSpout(messageCount, firstPollOffsetStrategy);
 
             //Emit and ack some tuples, ensure that some polled tuples remain cached in the spout by emitting less than maxPollRecords
             int beforeReactivationEmits = maxPollRecords - 3;
@@ -118,6 +112,7 @@ public class KafkaSpoutReactivationTest {
             //Cycle spout activation
             spout.deactivate();
             SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, commitCapture, beforeReactivationEmits - 1);
+            clearInvocations(consumerSpy);
             //Tuples may be acked/failed after the spout deactivates, so we have to be able to handle this too
             spout.ack(ackAfterDeactivateMessageId);
             spout.activate();
@@ -133,7 +128,7 @@ public class KafkaSpoutReactivationTest {
             spout.nextTuple();
 
             //Verify that no more tuples are emitted and all tuples are committed
-            SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(postReactivationConsumerSpy, commitCapture, messageCount);
+            SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, commitCapture, messageCount);
 
             clearInvocations(collector);
             spout.nextTuple();
@@ -142,4 +137,32 @@ public class KafkaSpoutReactivationTest {
 
     }
 
+    @Test
+    public void testSpoutShouldResumeWhereItLeftOffWithUncommittedEarliestStrategy() throws Exception {
+        //With uncommitted earliest the spout should pick up where it left off when reactivating.
+        doReactivationTest(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
+    }
+
+    @Test
+    public void testSpoutShouldResumeWhereItLeftOffWithEarliestStrategy() throws Exception {
+        //With earliest, the spout should also resume where it left off, rather than restart at the earliest offset.
+        doReactivationTest(FirstPollOffsetStrategy.EARLIEST);
+    }
+
+    @Test
+    public void testSpoutMustHandleGettingMetricsWhileDeactivated() throws Exception {
+        //Storm will try to get metrics from the spout even while deactivated, the spout must be able to handle this
+        prepareSpout(10, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
+
+        for (int i = 0; i < 5; i++) {
+            KafkaSpoutMessageId msgId = emitOne();
+            spout.ack(msgId);
+        }
+
+        spout.deactivate();
+
+        Map<String, Long> offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+        assertThat(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC + "/totalSpoutLag"), is(5L));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/528ab30b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 1d877f5..84def11 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -101,7 +101,7 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest {
         Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs);
         spout.ack(failedIdReplayCaptor.getValue());
         spout.nextTuple();
-        verify(consumerSpy).commitSync(commitCapture.capture());
+        verify(getKafkaConsumer()).commitSync(commitCapture.capture());
 
         Map<TopicPartition, OffsetAndMetadata> capturedCommit = commitCapture.getValue();
         TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);

http://git-wip-us.apache.org/repos/asf/storm/blob/528ab30b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
index 3276210..f45aaec 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
@@ -52,8 +52,6 @@ public class KafkaSpoutTopologyDeployActivateDeactivateTest extends KafkaSpoutAb
 
         verifyAllMessagesCommitted(1);
 
-        consumerSpy = createConsumerSpy();
-
         spout.activate();
 
         nextTuple_verifyEmitted_ack_resetCollector(1);


[2/2] storm git commit: Merge branch 'STORM-3013' of https://github.com/srdo/storm into STORM-3013-merge

Posted by ka...@apache.org.
Merge branch 'STORM-3013' of https://github.com/srdo/storm into STORM-3013-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6872adfc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6872adfc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6872adfc

Branch: refs/heads/master
Commit: 6872adfc260a9417e4dadc0e1c867476ed45bce6
Parents: db86bad 528ab30
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Jul 9 12:33:37 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jul 9 12:33:37 2018 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 49 +++++++++---------
 .../kafka/spout/metrics/KafkaOffsetMetric.java  |  9 ++--
 .../kafka/spout/KafkaSpoutAbstractTest.java     |  6 ++-
 .../kafka/spout/KafkaSpoutReactivationTest.java | 53 ++++++++++++++------
 .../kafka/spout/KafkaSpoutSingleTopicTest.java  |  2 +-
 ...outTopologyDeployActivateDeactivateTest.java |  2 -
 6 files changed, 73 insertions(+), 48 deletions(-)
----------------------------------------------------------------------