You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/12/10 01:53:51 UTC

[kafka] branch 3.4 updated: KAFKA-14454: Making unique StreamsConfig for tests (#12971)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 86fa681d002 KAFKA-14454: Making unique StreamsConfig for tests (#12971)
86fa681d002 is described below

commit 86fa681d00209b7f05ffbfc5c9ce0ff9c21964de
Author: vamossagar12 <sa...@gmail.com>
AuthorDate: Sat Dec 10 07:21:42 2022 +0530

    KAFKA-14454: Making unique StreamsConfig for tests (#12971)
    
    Newly added test KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions as part of KIP-837 passes when run individually but fails when is part of IT class and hence is marked as Ignored.
    
    That seemed to have been because of the way StreamsConfig was being initialised so any new test would have used the same names. Because of which the second test never got to the desired state. With this PR, every test gets a unique app name which seems to have fixed the issue. Also, a couple of cosmetic changes
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 ...yInnerJoinCustomPartitionerIntegrationTest.java | 36 ++++++++++++----------
 1 file changed, 20 insertions(+), 16 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
index 1a9e4635bb1..fbd2c767d87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -21,6 +21,11 @@ import static java.time.Duration.ofSeconds;
 import static java.util.Arrays.asList;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
@@ -65,7 +70,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInfo;
 
 @Timeout(600)
 @Tag("integration")
@@ -77,14 +82,13 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
     private final static String TABLE_1 = "table1";
     private final static String TABLE_2 = "table2";
     private final static String OUTPUT = "output-";
-    private final Properties streamsConfig = getStreamsConfig();
-    private final Properties streamsConfigTwo = getStreamsConfig();
-    private final Properties streamsConfigThree = getStreamsConfig();
+    private Properties streamsConfig;
+    private Properties streamsConfigTwo;
+    private Properties streamsConfigThree;
     private KafkaStreams streams;
     private KafkaStreams streamsTwo;
     private KafkaStreams streamsThree;
     private final static Properties CONSUMER_CONFIG = new Properties();
-
     private final static Properties PRODUCER_CONFIG_1 = new Properties();
     private final static Properties PRODUCER_CONFIG_2 = new Properties();
 
@@ -147,8 +151,12 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
     }
 
     @BeforeEach
-    public void before() throws IOException {
+    public void before(final TestInfo testInfo) throws IOException {
         final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+        streamsConfig = getStreamsConfig(safeTestName);
+        streamsConfigTwo = getStreamsConfig(safeTestName);
+        streamsConfigThree = getStreamsConfig(safeTestName);
         streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1");
         streamsConfigTwo.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-2");
         streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-3");
@@ -182,9 +190,8 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
         verifyKTableKTableJoin(expectedOne);
     }
 
-    @Disabled("This test works individually but fails when run along with the class. Ignoring for now.")
     @Test
-    public void shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions() throws Exception {
+    public void shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions() throws Exception {
         final String innerJoinType = "INNER";
         final String queryableName = innerJoinType + "-store1";
 
@@ -196,19 +203,16 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
 
         for (final KafkaStreams stream: kafkaStreamsList) {
             stream.setUncaughtExceptionHandler(e -> {
-                assertEquals("The partitions returned by StreamPartitioner#partitions method when used for FK join should be a singleton set", e.getCause().getMessage());
+                assertThat(e.getCause().getMessage(), equalTo("The partitions returned by StreamPartitioner#partitions method when used for FK join should be a singleton set"));
                 return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
             });
         }
 
         startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
 
-        // Sleeping to let the processing happen inducing the failure
-        Thread.sleep(60000);
+        // the streams applications should have shut down into `ERROR` due to the IllegalStateException
+        waitForApplicationState(Arrays.asList(streams, streamsTwo, streamsThree), KafkaStreams.State.ERROR, ofSeconds(60));
 
-        assertEquals(KafkaStreams.State.ERROR, streams.state());
-        assertEquals(KafkaStreams.State.ERROR, streamsTwo.state());
-        assertEquals(KafkaStreams.State.ERROR, streamsThree.state());
     }
 
     private void verifyKTableKTableJoin(final Set<KeyValue<String, String>> expectedResult) throws Exception {
@@ -230,9 +234,9 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
         assertEquals(expectedResult, result);
     }
 
-    private static Properties getStreamsConfig() {
+    private Properties getStreamsConfig(final String testName) {
         final Properties streamsConfig = new Properties();
-        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Partitioner");
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-FKJ-Partitioner-" + testName);
         streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfig.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);