You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/05 04:37:53 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #8799: KAFKA-8011: Fix flaky RegexSourceIntegrationTest

guozhangwang commented on a change in pull request #8799:
URL: https://github.com/apache/kafka/pull/8799#discussion_r435684743



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##########
@@ -124,7 +124,7 @@ public void setUp() throws InterruptedException {
         properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
-        streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test",
+        streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test-" + topicSuffixGenerator.get(),

Review comment:
       Could we use
   
   ```
       @Rule
       public TestName testName = new TestName();
   ```
   
   instead as the suffix?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##########
@@ -142,83 +142,89 @@ public void tearDown() throws IOException {
 
     @Test
     public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+        try {
+            final Serde<String> stringSerde = Serdes.String();
 
-        final Serde<String> stringSerde = Serdes.String();
-
-        final List<String> expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1");
-        // we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed
-        // with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always
-        // in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test
-        final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
+            final List<String> expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1");
+            // we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed
+            // with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always
+            // in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test
+            final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
 
-        CLUSTER.createTopic("TEST-TOPIC-1");
+            CLUSTER.createTopic("TEST-TOPIC-1");
 
-        final StreamsBuilder builder = new StreamsBuilder();
+            final StreamsBuilder builder = new StreamsBuilder();
 
-        final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+            final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
 
-        pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));
-        final List<String> assignedTopics = new CopyOnWriteArrayList<>();
-        streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() {
-            @Override
-            public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
-                return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) {
-                    @Override
-                    public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) {
-                        super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener));
-                    }
-                };
+            pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));
+            final List<String> assignedTopics = new CopyOnWriteArrayList<>();
+            streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() {
+                @Override
+                public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
+                    return new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+                        @Override
+                        public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) {
+                            super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener));
+                        }
+                    };
 
-            }
-        });
+                }
+            });
 
-        streams.start();
-        TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
+            streams.start();
+            TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
 
-        CLUSTER.createTopic("TEST-TOPIC-2");
+            CLUSTER.createTopic("TEST-TOPIC-2");
 
-        TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
+            TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
-        streams.close();
-        CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+            streams.close();

Review comment:
       Why we need to call `streams.close()` inside the function given they are always called in `tearDown`? Ditto below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org