You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/24 04:31:38 UTC
kafka git commit: KAFAK-4058: Failure in
org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterReset
Repository: kafka
Updated Branches:
refs/heads/trunk 7afdad8c3 -> 63da48721
KAFAK-4058: Failure in org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterReset
- fixed consumer group dead condition
- disabled state store cache
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2056 from mjsax/KAFKA-4058-instableResetToolTest
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/63da4872
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/63da4872
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/63da4872
Branch: refs/heads/trunk
Commit: 63da487213765cb543a4255a83acb876a57d3634
Parents: 7afdad8
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Sun Oct 23 21:31:34 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Oct 23 21:31:34 2016 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/tools/StreamsResetter.java | 2 +-
.../kafka/streams/integration/ResetIntegrationTest.java | 12 +++++++-----
2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/63da4872/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 8392f66..71fb082 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -90,7 +90,7 @@ public class StreamsResetter {
adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption));
final String groupId = this.options.valueOf(applicationIdOption);
- if (!adminClient.describeConsumerGroup(groupId).consumers().isEmpty()) {
+ if (!adminClient.describeConsumerGroup(groupId).consumers().get().isEmpty()) {
throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " +
"Make sure to stop all running application instances before running the reset tool.");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/63da4872/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index ced1109..110bfba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -131,15 +131,16 @@ public class ResetIntegrationTest {
).get(0);
streams.close();
- TestUtils.waitForCondition(consumerGroupInactive, 5 * STREAMS_CONSUMER_TIMEOUT,
- "Streams Application consumer group did not time out after " + (5 * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+ final int timeoutMultiplier = 5;
+ TestUtils.waitForCondition(consumerGroupInactive, timeoutMultiplier * STREAMS_CONSUMER_TIMEOUT,
+ "Streams Application consumer group did not time out after " + (timeoutMultiplier * STREAMS_CONSUMER_TIMEOUT) + " ms.");
// RESET
streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration);
streams.cleanUp();
cleanGlobal();
- TestUtils.waitForCondition(consumerGroupInactive, 5 * CLEANUP_CONSUMER_TIMEOUT,
- "Reset Tool consumer group did not time out after " + (5 * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+ TestUtils.waitForCondition(consumerGroupInactive, timeoutMultiplier * CLEANUP_CONSUMER_TIMEOUT,
+ "Reset Tool consumer group did not time out after " + (timeoutMultiplier * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
assertInternalTopicsGotDeleted();
@@ -170,6 +171,7 @@ public class ResetIntegrationTest {
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
+ streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -300,7 +302,7 @@ public class ResetIntegrationTest {
private class WaitUntilConsumerGroupGotClosed implements TestCondition {
@Override
public boolean conditionMet() {
- return adminClient.describeConsumerGroup(APP_ID).consumers().isEmpty();
+ return adminClient.describeConsumerGroup(APP_ID).consumers().get().isEmpty();
}
}