You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/17 18:02:29 UTC

[kafka] branch trunk updated: MINOR: Fix race condition in Streams tests (#6748)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a1286ed  MINOR: Fix race condition in Streams tests (#6748)
a1286ed is described below

commit a1286edb040a5d368a76d6c9c809c5cf0c080ec8
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri May 17 20:02:13 2019 +0200

    MINOR: Fix race condition in Streams tests (#6748)
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 .../kafka/streams/integration/GlobalKTableEOSIntegrationTest.java | 6 +++---
 .../kafka/streams/integration/GlobalKTableIntegrationTest.java    | 8 ++++----
 .../integration/KStreamAggregationDedupIntegrationTest.java       | 6 +++---
 .../streams/integration/KStreamAggregationIntegrationTest.java    | 6 +++---
 .../kafka/streams/integration/QueryableStateIntegrationTest.java  | 6 +++---
 5 files changed, 16 insertions(+), 16 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 1aa99f6..b247ec2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -53,6 +53,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 
 @Category({IntegrationTest.class})
 public class GlobalKTableEOSIntegrationTest {
@@ -68,7 +69,7 @@ public class GlobalKTableEOSIntegrationTest {
     public static final EmbeddedKafkaCluster CLUSTER =
             new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
 
-    private static volatile int testNo = 0;
+    private static volatile AtomicInteger testNo = new AtomicInteger(0);
     private final MockTime mockTime = CLUSTER.time;
     private final KeyValueMapper<String, Long, Long> keyMapper = (key, value) -> value;
     private final ValueJoiner<Long, String, String> joiner = (value1, value2) -> value1 + "+" + value2;
@@ -85,11 +86,10 @@ public class GlobalKTableEOSIntegrationTest {
 
     @Before
     public void before() throws Exception {
-        testNo++;
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "globalTableTopic-table-eos-test-" + testNo;
+        final String applicationId = "globalTableTopic-table-eos-test-" + testNo.incrementAndGet();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 6617512..d3e0d24 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -51,9 +51,10 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.hamcrest.core.IsEqual.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
 
 @Category({IntegrationTest.class})
 public class GlobalKTableIntegrationTest {
@@ -62,7 +63,7 @@ public class GlobalKTableIntegrationTest {
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
-    private static volatile int testNo = 0;
+    private static volatile AtomicInteger testNo = new AtomicInteger(0);
     private final MockTime mockTime = CLUSTER.time;
     private final KeyValueMapper<String, Long, Long> keyMapper = (key, value) -> value;
     private final ValueJoiner<Long, String, String> joiner = (value1, value2) -> value1 + "+" + value2;
@@ -79,11 +80,10 @@ public class GlobalKTableIntegrationTest {
 
     @Before
     public void before() throws Exception {
-        testNo++;
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "globalTableTopic-table-test-" + testNo;
+        final String applicationId = "globalTableTopic-table-test-" + testNo.incrementAndGet();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index e1c9b5b..14346cf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -52,6 +52,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.time.Duration.ofMillis;
 
@@ -69,7 +70,7 @@ public class KStreamAggregationDedupIntegrationTest {
         new EmbeddedKafkaCluster(NUM_BROKERS);
 
     private final MockTime mockTime = CLUSTER.time;
-    private static volatile int testNo = 0;
+    private static volatile AtomicInteger testNo = new AtomicInteger(0);
     private StreamsBuilder builder;
     private Properties streamsConfiguration;
     private KafkaStreams kafkaStreams;
@@ -81,11 +82,10 @@ public class KStreamAggregationDedupIntegrationTest {
 
     @Before
     public void before() throws InterruptedException {
-        testNo++;
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "kgrouped-stream-test-" + testNo;
+        final String applicationId = "kgrouped-stream-test-" + testNo.incrementAndGet();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index ea3695e..2f06af7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -82,6 +82,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.time.Duration.ofMillis;
 import static java.time.Instant.ofEpochMilli;
@@ -100,7 +101,7 @@ public class KStreamAggregationIntegrationTest {
     public static final EmbeddedKafkaCluster CLUSTER =
         new EmbeddedKafkaCluster(NUM_BROKERS);
 
-    private static volatile int testNo = 0;
+    private static volatile AtomicInteger testNo = new AtomicInteger(0);
     private final MockTime mockTime = CLUSTER.time;
     private StreamsBuilder builder;
     private Properties streamsConfiguration;
@@ -116,11 +117,10 @@ public class KStreamAggregationIntegrationTest {
 
     @Before
     public void before() throws InterruptedException {
-        testNo++;
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "kgrouped-stream-test-" + testNo;
+        final String applicationId = "kgrouped-stream-test-" + testNo.incrementAndGet();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration
             .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 14d9e5d..c5dbabe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -81,6 +81,7 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.time.Duration.ofMillis;
 import static java.time.Duration.ofSeconds;
@@ -121,7 +122,7 @@ public class QueryableStateIntegrationTest {
     private KafkaStreams kafkaStreams;
     private Comparator<KeyValue<String, String>> stringComparator;
     private Comparator<KeyValue<String, Long>> stringLongComparator;
-    private static int testNo = 0;
+    private static volatile AtomicInteger testNo = new AtomicInteger(0);
 
     private void createTopics() throws Exception {
         streamOne = streamOne + "-" + testNo;
@@ -176,10 +177,9 @@ public class QueryableStateIntegrationTest {
 
     @Before
     public void before() throws Exception {
-        testNo++;
         createTopics();
         streamsConfiguration = new Properties();
-        final String applicationId = "queryable-state-" + testNo;
+        final String applicationId = "queryable-state-" + testNo.incrementAndGet();
 
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());