You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/25 21:10:34 UTC

[kafka] branch 1.0 updated: KAFKA-8026: Fix flaky regex source integration test 1.0 (#6463)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 7434ef1  KAFKA-8026: Fix flaky regex source integration test 1.0 (#6463)
7434ef1 is described below

commit 7434ef11d090bdf8583363e453552df6a1254f5c
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Mon Mar 25 17:10:23 2019 -0400

    KAFKA-8026: Fix flaky regex source integration test 1.0 (#6463)
    
    Reviewers: Matthias J. Sax <mj...@apache.org>,  John Roesler <jo...@confluent.io>
---
 .../integration/RegexSourceIntegrationTest.java    | 200 ++++++++++++---------
 1 file changed, 111 insertions(+), 89 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 5f0a107..6773b2a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -44,7 +44,6 @@ import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -57,7 +56,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -90,12 +90,13 @@ public class RegexSourceIntegrationTest {
     private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
     private Properties streamsConfiguration;
     private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated";
-    private KafkaStreams streams;
 
+    @Before
+    public void setUp() throws InterruptedException {
+        final Properties properties = new Properties();
+        properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
-    @BeforeClass
-    public static void startKafkaCluster() throws InterruptedException {
-        CLUSTER.createTopics(
+        CLUSTER.deleteAndRecreateTopics(
             TOPIC_1,
             TOPIC_2,
             TOPIC_A,
@@ -105,26 +106,21 @@ public class RegexSourceIntegrationTest {
             FA_TOPIC,
             FOO_TOPIC,
             DEFAULT_OUTPUT_TOPIC);
+
+        CLUSTER.deleteTopicsAndWait(PARTITIONED_TOPIC_1, PARTITIONED_TOPIC_2);
+
         CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1);
         CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
-    }
 
-    @Before
-    public void setUp() {
-        final Properties properties = new Properties();
-        properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-        streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test",
-                                                                 CLUSTER.bootstrapServers(),
-                                                                 STRING_SERDE_CLASSNAME,
-                                                                 STRING_SERDE_CLASSNAME,
-                                                                 properties);
+        streamsConfiguration = StreamsTestUtils.getStreamsConfig(UUID.randomUUID().toString(),
+            CLUSTER.bootstrapServers(),
+            STRING_SERDE_CLASSNAME,
+            STRING_SERDE_CLASSNAME,
+            properties);
     }
 
     @After
     public void tearDown() throws IOException {
-        if (streams != null) {
-            streams.close();
-        }
         // Remove any state from previous test runs
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
@@ -145,8 +141,8 @@ public class RegexSourceIntegrationTest {
         final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
-        final List<String> assignedTopics = new CopyOnWriteArrayList<>();
-        streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier() {
+        final List<String> assignedTopics = new ArrayList<>();
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier() {
             @Override
             public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
                 return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) {
@@ -159,23 +155,30 @@ public class RegexSourceIntegrationTest {
             }
         });
 
+        try {
+            streams.start();
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    synchronized (assignedTopics) {
+                        return assignedTopics.equals(expectedFirstAssignment);
+                    }
+                }
+            }, STREAM_TASKS_NOT_UPDATED);
 
-        streams.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return assignedTopics.equals(expectedFirstAssignment);
-            }
-        }, STREAM_TASKS_NOT_UPDATED);
-
-        CLUSTER.createTopic("TEST-TOPIC-2");
+            CLUSTER.createTopic("TEST-TOPIC-2");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return assignedTopics.equals(expectedSecondAssignment);
-            }
-        }, STREAM_TASKS_NOT_UPDATED);
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    synchronized (assignedTopics) {
+                        return assignedTopics.equals(expectedSecondAssignment);
+                    }
+                }
+            }, STREAM_TASKS_NOT_UPDATED);
+        } finally {
+            streams.close(5, TimeUnit.SECONDS);
+        }
 
     }
 
@@ -196,8 +199,8 @@ public class RegexSourceIntegrationTest {
 
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
-        final List<String> assignedTopics = new CopyOnWriteArrayList<>();
-        streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier() {
+        final List<String> assignedTopics = new ArrayList<>();
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig, new DefaultKafkaClientSupplier() {
             @Override
             public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
                 return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) {
@@ -210,23 +213,30 @@ public class RegexSourceIntegrationTest {
             }
         });
 
+        try {
+            streams.start();
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    synchronized (assignedTopics) {
+                        return assignedTopics.equals(expectedFirstAssignment);
+                    }
+                }
+            }, STREAM_TASKS_NOT_UPDATED);
 
-        streams.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return assignedTopics.equals(expectedFirstAssignment);
-            }
-        }, STREAM_TASKS_NOT_UPDATED);
-
-        CLUSTER.deleteTopic("TEST-TOPIC-A");
+            CLUSTER.deleteTopic("TEST-TOPIC-A");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return assignedTopics.equals(expectedSecondAssignment);
-            }
-        }, STREAM_TASKS_NOT_UPDATED);
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    synchronized (assignedTopics) {
+                        return assignedTopics.equals(expectedSecondAssignment);
+                    }
+                }
+            }, STREAM_TASKS_NOT_UPDATED);
+        } finally {
+            streams.close(5, TimeUnit.SECONDS);
+        }
     }
 
     @SuppressWarnings("deprecation")
@@ -238,12 +248,12 @@ public class RegexSourceIntegrationTest {
         final long thirtySecondTimeout = 30 * 1000;
 
         final TopologyBuilder builder = new TopologyBuilder()
-                .addSource("ingest", Pattern.compile("topic-\\d+"))
-                .addProcessor("my-processor", processorSupplier, "ingest")
-                .addStateStore(stateStoreSupplier, "my-processor");
+            .addSource("ingest", Pattern.compile("topic-\\d+"))
+            .addProcessor("my-processor", processorSupplier, "ingest")
+            .addStateStore(stateStoreSupplier, "my-processor");
 
 
-        streams = new KafkaStreams(builder, streamsConfiguration);
+        final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         try {
             streams.start();
 
@@ -259,7 +269,7 @@ public class RegexSourceIntegrationTest {
             TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic, thirtySecondTimeout, "Did not find topic: [topic-1] connected to state store: [testStateStore]");
 
         } finally {
-            streams.close();
+            streams.close(5, TimeUnit.SECONDS);
         }
     }
 
@@ -287,31 +297,35 @@ public class RegexSourceIntegrationTest {
         pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
         namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
-        streams = new KafkaStreams(builder.build(), streamsConfiguration);
-        streams.start();
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
+        try {
+            streams.start();
 
-        final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
+            final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
 
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig, mockTime);
+            IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList(topic1TestMessage), producerConfig, mockTime);
+            IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList(topic2TestMessage), producerConfig, mockTime);
+            IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList(topicATestMessage), producerConfig, mockTime);
+            IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList(topicCTestMessage), producerConfig, mockTime);
+            IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList(topicYTestMessage), producerConfig, mockTime);
+            IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList(topicZTestMessage), producerConfig, mockTime);
 
-        final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
+            final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
 
-        final List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage);
-        final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6);
-        final List<String> actualValues = new ArrayList<>(6);
+            final List<String> expectedReceivedValues = Arrays.asList(topicATestMessage, topic1TestMessage, topic2TestMessage, topicCTestMessage, topicYTestMessage, topicZTestMessage);
+            final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6);
+            final List<String> actualValues = new ArrayList<>(6);
 
-        for (final KeyValue<String, String> receivedKeyValue : receivedKeyValues) {
-            actualValues.add(receivedKeyValue.value);
-        }
+            for (final KeyValue<String, String> receivedKeyValue : receivedKeyValues) {
+                actualValues.add(receivedKeyValue.value);
+            }
 
-        Collections.sort(actualValues);
-        Collections.sort(expectedReceivedValues);
-        assertThat(actualValues, equalTo(expectedReceivedValues));
+            Collections.sort(actualValues);
+            Collections.sort(expectedReceivedValues);
+            assertThat(actualValues, equalTo(expectedReceivedValues));
+        } finally {
+            streams.close(5, TimeUnit.SECONDS);
+        }
     }
 
     @Test
@@ -404,18 +418,22 @@ public class RegexSourceIntegrationTest {
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
         pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
 
-        streams = new KafkaStreams(builder.build(), streamsConfiguration);
-        streams.start();
+        final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
+        try {
+            streams.start();
 
-        final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
+            final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
 
-        IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig, mockTime);
+            IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList(fMessage), producerConfig, mockTime);
+            IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList(fooMessage), producerConfig, mockTime);
 
-        final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
+            final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
 
-        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000);
-        fail("Should not get here");
+            IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000);
+            fail("Should not get here");
+        } finally {
+            streams.close(5, TimeUnit.SECONDS);
+        }
     }
 
     private static class TheConsumerRebalanceListener implements ConsumerRebalanceListener {
@@ -429,16 +447,20 @@ public class RegexSourceIntegrationTest {
 
         @Override
         public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
-            assignedTopics.clear();
+            synchronized (assignedTopics) {
+                assignedTopics.clear();
+            }
             listener.onPartitionsRevoked(partitions);
         }
 
         @Override
         public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
-            for (final TopicPartition partition : partitions) {
-                assignedTopics.add(partition.topic());
+            synchronized (assignedTopics) {
+                for (final TopicPartition partition : partitions) {
+                    assignedTopics.add(partition.topic());
+                }
+                Collections.sort(assignedTopics);
             }
-            Collections.sort(assignedTopics);
             listener.onPartitionsAssigned(partitions);
         }
     }