You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/09 21:36:00 UTC
kafka git commit: KAFKA-5063: Fix flaky
o.a.k.streams.integration.ResetIntegrationTest
Repository: kafka
Updated Branches:
refs/heads/trunk b982eefd3 -> 7371bf7f6
KAFKA-5063: Fix flaky o.a.k.streams.integration.ResetIntegrationTest
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy, Eno Thereska, Guozhang Wang
Closes #2931 from mjsax/kafka-5140-flaky-reset-integration-test
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7371bf7f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7371bf7f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7371bf7f
Branch: refs/heads/trunk
Commit: 7371bf7f65d1245b084f17e534b5728d5929e207
Parents: b982eef
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue May 9 14:35:57 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 9 14:35:57 2017 -0700
----------------------------------------------------------------------
.../integration/ResetIntegrationTest.java | 21 +++++++--------
.../integration/utils/EmbeddedKafkaCluster.java | 28 ++++++++++++++++----
2 files changed, 33 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7371bf7f/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 2b4f14c..775ac8d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -74,7 +74,10 @@ public class ResetIntegrationTest {
// expiration of connections by the brokers to avoid errors when `AdminClient` sends requests after potentially
// very long sleep times
props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
- CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props);
+ // we align time to seconds to get clean window boundaries and thus ensure the same result for each run
+ // otherwise, input records could fall into different windows for different runs depending on the initial mock time
+ final long alignedTime = (System.currentTimeMillis() / 1000) * 1000;
+ CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props, alignedTime);
}
private static final String APP_ID = "cleanup-integration-test";
@@ -143,14 +146,13 @@ public class ResetIntegrationTest {
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC,
- 10,
- 60000);
+ 10);
// receive only first values to make sure intermediate user topic is not consumed completely
// => required to test "seekToEnd" for intermediate topics
final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC_2,
- 10
+ 40
);
streams.close();
@@ -177,12 +179,11 @@ public class ResetIntegrationTest {
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC,
- 10,
- 60000);
+ 10);
final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC_2_RERUN,
- 10
+ 40
);
streams.close();
@@ -229,8 +230,7 @@ public class ResetIntegrationTest {
final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC,
- 10,
- 60000);
+ 10);
streams.close();
TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
@@ -250,8 +250,7 @@ public class ResetIntegrationTest {
final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
resultTopicConsumerConfig,
OUTPUT_TOPIC,
- 10,
- 60000);
+ 10);
streams.close();
assertThat(resultRerun, equalTo(result));
http://git-wip-us.apache.org/repos/asf/kafka/blob/7371bf7f/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 70d271c..6a0fc51 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -37,21 +37,36 @@ public class EmbeddedKafkaCluster extends ExternalResource {
private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
- public static final int TOPIC_CREATION_TIMEOUT = 30000;
+ private static final int TOPIC_CREATION_TIMEOUT = 30000;
private EmbeddedZookeeper zookeeper = null;
private final KafkaEmbedded[] brokers;
private final Properties brokerConfig;
+ public final MockTime time;
public EmbeddedKafkaCluster(final int numBrokers) {
this(numBrokers, new Properties());
}
- public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) {
+ public EmbeddedKafkaCluster(final int numBrokers,
+ final Properties brokerConfig) {
+ this(numBrokers, brokerConfig, System.currentTimeMillis());
+ }
+
+ public EmbeddedKafkaCluster(final int numBrokers,
+ final Properties brokerConfig,
+ final long mockTimeMillisStart) {
+ this(numBrokers, brokerConfig, mockTimeMillisStart, System.nanoTime());
+ }
+
+ public EmbeddedKafkaCluster(final int numBrokers,
+ final Properties brokerConfig,
+ final long mockTimeMillisStart,
+ final long mockTimeNanoStart) {
brokers = new KafkaEmbedded[numBrokers];
this.brokerConfig = brokerConfig;
- }
+ time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
- public final MockTime time = new MockTime();
+ }
/**
* Creates and starts a Kafka cluster.
@@ -82,8 +97,9 @@ public class EmbeddedKafkaCluster extends ExternalResource {
}
private void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) {
- if (!props.containsKey(propertyKey))
+ if (!props.containsKey(propertyKey)) {
brokerConfig.put(propertyKey, propertyValue);
+ }
}
/**
@@ -115,10 +131,12 @@ public class EmbeddedKafkaCluster extends ExternalResource {
return brokers[0].brokerList();
}
+ @Override
protected void before() throws Throwable {
start();
}
+ @Override
protected void after() {
stop();
}