You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/24 04:31:38 UTC

kafka git commit: KAFAK-4058: Failure in org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterReset

Repository: kafka
Updated Branches:
  refs/heads/trunk 7afdad8c3 -> 63da48721


KAFAK-4058: Failure in org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterReset

 - fixed consumer group dead condition
 - disabled state store cache

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2056 from mjsax/KAFKA-4058-instableResetToolTest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/63da4872
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/63da4872
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/63da4872

Branch: refs/heads/trunk
Commit: 63da487213765cb543a4255a83acb876a57d3634
Parents: 7afdad8
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Sun Oct 23 21:31:34 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Oct 23 21:31:34 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/StreamsResetter.java    |  2 +-
 .../kafka/streams/integration/ResetIntegrationTest.java | 12 +++++++-----
 2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/63da4872/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 8392f66..71fb082 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -90,7 +90,7 @@ public class StreamsResetter {
 
             adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption));
             final String groupId = this.options.valueOf(applicationIdOption);
-            if (!adminClient.describeConsumerGroup(groupId).consumers().isEmpty()) {
+            if (!adminClient.describeConsumerGroup(groupId).consumers().get().isEmpty()) {
                 throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " +
                     "Make sure to stop all running application instances before running the reset tool.");
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/63da4872/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index ced1109..110bfba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -131,15 +131,16 @@ public class ResetIntegrationTest {
         ).get(0);
 
         streams.close();
-        TestUtils.waitForCondition(consumerGroupInactive, 5 * STREAMS_CONSUMER_TIMEOUT,
-            "Streams Application consumer group did not time out after " + (5 * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+        final int timeoutMultiplier = 5;
+        TestUtils.waitForCondition(consumerGroupInactive, timeoutMultiplier * STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group did not time out after " + (timeoutMultiplier * STREAMS_CONSUMER_TIMEOUT) + " ms.");
 
         // RESET
         streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
         streams.cleanUp();
         cleanGlobal();
-        TestUtils.waitForCondition(consumerGroupInactive, 5 * CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + (5 * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        TestUtils.waitForCondition(consumerGroupInactive, timeoutMultiplier * CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + (timeoutMultiplier * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
 
         assertInternalTopicsGotDeleted();
 
@@ -170,6 +171,7 @@ public class ResetIntegrationTest {
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -300,7 +302,7 @@ public class ResetIntegrationTest {
     private class WaitUntilConsumerGroupGotClosed implements TestCondition {
         @Override
         public boolean conditionMet() {
-            return adminClient.describeConsumerGroup(APP_ID).consumers().isEmpty();
+            return adminClient.describeConsumerGroup(APP_ID).consumers().get().isEmpty();
         }
     }