You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/09 21:36:00 UTC

kafka git commit: KAFKA-5063: Fix flaky o.a.k.streams.integration.ResetIntegrationTest

Repository: kafka
Updated Branches:
  refs/heads/trunk b982eefd3 -> 7371bf7f6


KAFKA-5063: Fix flaky o.a.k.streams.integration.ResetIntegrationTest

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy, Eno Thereska, Guozhang Wang

Closes #2931 from mjsax/kafka-5140-flaky-reset-integration-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7371bf7f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7371bf7f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7371bf7f

Branch: refs/heads/trunk
Commit: 7371bf7f65d1245b084f17e534b5728d5929e207
Parents: b982eef
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue May 9 14:35:57 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 9 14:35:57 2017 -0700

----------------------------------------------------------------------
 .../integration/ResetIntegrationTest.java       | 21 +++++++--------
 .../integration/utils/EmbeddedKafkaCluster.java | 28 ++++++++++++++++----
 2 files changed, 33 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7371bf7f/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 2b4f14c..775ac8d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -74,7 +74,10 @@ public class ResetIntegrationTest {
         // expiration of connections by the brokers to avoid errors when `AdminClient` sends requests after potentially
         // very long sleep times
         props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
-        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props);
+        // we align time to seconds to get clean window boundaries and thus ensure the same result for each run
+        // otherwise, input records could fall into different windows for different runs depending on the initial mock time
+        final long alignedTime = (System.currentTimeMillis() / 1000) * 1000;
+        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props, alignedTime);
     }
 
     private static final String APP_ID = "cleanup-integration-test";
@@ -143,14 +146,13 @@ public class ResetIntegrationTest {
         final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
             resultTopicConsumerConfig,
             OUTPUT_TOPIC,
-            10,
-            60000);
+            10);
         // receive only first values to make sure intermediate user topic is not consumed completely
         // => required to test "seekToEnd" for intermediate topics
         final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
             resultTopicConsumerConfig,
             OUTPUT_TOPIC_2,
-            10
+            40
         );
 
         streams.close();
@@ -177,12 +179,11 @@ public class ResetIntegrationTest {
         final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
             resultTopicConsumerConfig,
             OUTPUT_TOPIC,
-            10,
-            60000);
+            10);
         final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
             resultTopicConsumerConfig,
             OUTPUT_TOPIC_2_RERUN,
-            10
+            40
         );
         streams.close();
 
@@ -229,8 +230,7 @@ public class ResetIntegrationTest {
         final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
                 resultTopicConsumerConfig,
                 OUTPUT_TOPIC,
-                10,
-                60000);
+                10);
 
         streams.close();
         TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT,
@@ -250,8 +250,7 @@ public class ResetIntegrationTest {
         final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
                 resultTopicConsumerConfig,
                 OUTPUT_TOPIC,
-                10,
-                60000);
+                10);
         streams.close();
 
         assertThat(resultRerun, equalTo(result));

http://git-wip-us.apache.org/repos/asf/kafka/blob/7371bf7f/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 70d271c..6a0fc51 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -37,21 +37,36 @@ public class EmbeddedKafkaCluster extends ExternalResource {
 
     private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
     private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
-    public static final int TOPIC_CREATION_TIMEOUT = 30000;
+    private static final int TOPIC_CREATION_TIMEOUT = 30000;
     private EmbeddedZookeeper zookeeper = null;
     private final KafkaEmbedded[] brokers;
     private final Properties brokerConfig;
+    public final MockTime time;
 
     public EmbeddedKafkaCluster(final int numBrokers) {
         this(numBrokers, new Properties());
     }
 
-    public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) {
+    public EmbeddedKafkaCluster(final int numBrokers,
+                                final Properties brokerConfig) {
+        this(numBrokers, brokerConfig, System.currentTimeMillis());
+    }
+
+    public EmbeddedKafkaCluster(final int numBrokers,
+                                final Properties brokerConfig,
+                                final long mockTimeMillisStart) {
+        this(numBrokers, brokerConfig, mockTimeMillisStart, System.nanoTime());
+    }
+
+    public EmbeddedKafkaCluster(final int numBrokers,
+                                final Properties brokerConfig,
+                                final long mockTimeMillisStart,
+                                final long mockTimeNanoStart) {
         brokers = new KafkaEmbedded[numBrokers];
         this.brokerConfig = brokerConfig;
-    }
+        time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
 
-    public final MockTime time = new MockTime();
+    }
 
     /**
      * Creates and starts a Kafka cluster.
@@ -82,8 +97,9 @@ public class EmbeddedKafkaCluster extends ExternalResource {
     }
 
     private void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) {
-        if (!props.containsKey(propertyKey))
+        if (!props.containsKey(propertyKey)) {
             brokerConfig.put(propertyKey, propertyValue);
+        }
     }
 
     /**
@@ -115,10 +131,12 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         return brokers[0].brokerList();
     }
 
+    @Override
     protected void before() throws Throwable {
         start();
     }
 
+    @Override
     protected void after() {
         stop();
     }