You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/11/10 18:16:51 UTC

[flink] branch release-1.16 updated (7ae60884e5e -> 9e7ebdc6713)

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 7ae60884e5e [hotfix] Document connector_artifact shortcode
     new 0df3c6668eb [FLINK-29914][tests] Wait for Kafka topic creation/deletion
     new 9e7ebdc6713 [FLINK-24119][tests] Add random to Kafka tests topic name

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connector/kafka/sink/KafkaSinkITCase.java      |  5 ++--
 .../sink/testutils/KafkaSinkExternalContext.java   | 12 ++------
 .../kafka/source/reader/KafkaSourceReaderTest.java |  7 +++--
 .../testutils/KafkaSourceExternalContext.java      |  6 ++--
 .../kafka/FlinkKafkaInternalProducerITCase.java    |  2 +-
 .../connectors/kafka/KafkaConsumerTestBase.java    | 35 ++++++++++------------
 .../streaming/connectors/kafka/KafkaITCase.java    |  3 +-
 .../connectors/kafka/KafkaProducerTestBase.java    | 20 ++++++++-----
 .../kafka/KafkaShortRetentionTestBase.java         |  3 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 14 ++-------
 .../shuffle/KafkaShuffleExactlyOnceITCase.java     |  6 ++--
 .../kafka/shuffle/KafkaShuffleITCase.java          | 13 ++++----
 .../connectors/kafka/table/KafkaTableITCase.java   | 25 +++++++++-------
 .../connectors/kafka/table/KafkaTableTestBase.java | 19 ++++++++----
 .../tests/util/kafka/KafkaContainerClient.java     | 12 ++++++--
 .../flink/tests/util/kafka/SmokeKafkaITCase.java   |  8 +++--
 16 files changed, 105 insertions(+), 85 deletions(-)


[flink] 02/02: [FLINK-24119][tests] Add random to Kafka tests topic name

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9e7ebdc671386a8127ddf5affb66e997d877cb7b
Author: Gabor Somogyi <ga...@apple.com>
AuthorDate: Fri Oct 28 16:15:29 2022 +0200

    [FLINK-24119][tests] Add random to Kafka tests topic name
---
 .../kafka/FlinkKafkaInternalProducerITCase.java    |  2 +-
 .../connectors/kafka/KafkaConsumerTestBase.java    | 26 +++++++++++-----------
 .../streaming/connectors/kafka/KafkaITCase.java    |  3 ++-
 .../connectors/kafka/KafkaProducerTestBase.java    | 20 ++++++++++-------
 .../kafka/KafkaShortRetentionTestBase.java         |  3 ++-
 .../shuffle/KafkaShuffleExactlyOnceITCase.java     |  6 +++--
 .../kafka/shuffle/KafkaShuffleITCase.java          | 13 ++++++-----
 .../connectors/kafka/table/KafkaTableITCase.java   | 25 ++++++++++++---------
 8 files changed, 57 insertions(+), 41 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
index 79f12abb95e..b9ed7d92f14 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java
@@ -204,7 +204,7 @@ public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
 
     @Test(timeout = 30000L)
     public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
-        String topic = "flink-kafka-producer-txn-coordinator-changed";
+        String topic = "flink-kafka-producer-txn-coordinator-changed-" + UUID.randomUUID();
         createTestTopic(topic, 1, 1);
         Producer<String, String> kafkaProducer = new FlinkKafkaInternalProducer<>(extraProperties);
         try {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index ac5eef772b2..a8596ee7cc6 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -805,8 +805,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
      */
     @RetryOnException(times = 2, exception = NotLeaderForPartitionException.class)
     public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
-        final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
-        final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID().toString();
+        final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID();
+        final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID();
 
         final int parallelism = 3;
         final int elementsPerPartition = 100;
@@ -944,7 +944,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
      */
     public void runOneToOneExactlyOnceTest() throws Exception {
 
-        final String topic = "oneToOneTopic";
+        final String topic = "oneToOneTopic-" + UUID.randomUUID();
         final int parallelism = 5;
         final int numElementsPerPartition = 1000;
         final int totalElements = parallelism * numElementsPerPartition;
@@ -992,7 +992,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
      * Flink source will read multiple Kafka partitions.
      */
     public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception {
-        final String topic = "oneToManyTopic";
+        final String topic = "oneToManyTopic-" + UUID.randomUUID();
         final int numPartitions = 5;
         final int numElementsPerPartition = 1000;
         final int totalElements = numPartitions * numElementsPerPartition;
@@ -1042,7 +1042,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
      * means that some Flink sources will read no partitions.
      */
     public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception {
-        final String topic = "manyToOneTopic";
+        final String topic = "manyToOneTopic-" + UUID.randomUUID();
         final int numPartitions = 5;
         final int numElementsPerPartition = 1000;
         final int totalElements = numPartitions * numElementsPerPartition;
@@ -1094,7 +1094,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 
     /** Tests that the source can be properly canceled when reading full partitions. */
     public void runCancelingOnFullInputTest() throws Exception {
-        final String topic = "cancelingOnFullTopic";
+        final String topic = "cancelingOnFullTopic-" + UUID.randomUUID();
 
         final int parallelism = 3;
         createTestTopic(topic, parallelism, 1);
@@ -1168,7 +1168,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 
     /** Tests that the source can be properly canceled when reading empty partitions. */
     public void runCancelingOnEmptyInputTest() throws Exception {
-        final String topic = "cancelingOnEmptyInputTopic";
+        final String topic = "cancelingOnEmptyInputTopic-" + UUID.randomUUID();
 
         final int parallelism = 3;
         createTestTopic(topic, parallelism, 1);
@@ -1238,7 +1238,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
         // create topics with content
         final List<String> topics = new ArrayList<>();
         for (int i = 0; i < numTopics; i++) {
-            final String topic = topicNamePrefix + i;
+            final String topic = topicNamePrefix + i + "-" + UUID.randomUUID();
             topics.add(topic);
             // create topic
             createTestTopic(topic, i + 1 /*partitions*/, 1);
@@ -1343,7 +1343,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
      */
     public void runBigRecordTestTopology() throws Exception {
 
-        final String topic = "bigRecordTestTopic";
+        final String topic = "bigRecordTestTopic-" + UUID.randomUUID();
         final int parallelism = 1; // otherwise, the kafka mini clusters may run out of heap space
 
         createTestTopic(topic, parallelism, 1);
@@ -1508,7 +1508,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
     }
 
     public void runKeyValueTest() throws Exception {
-        final String topic = "keyvaluetest";
+        final String topic = "keyvaluetest-" + UUID.randomUUID();
         createTestTopic(topic, 1, 1);
         final int elementCount = 5000;
 
@@ -1608,7 +1608,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
      * @throws Exception
      */
     public void runAllDeletesTest() throws Exception {
-        final String topic = "alldeletestest";
+        final String topic = "alldeletestest-" + UUID.randomUUID();
         createTestTopic(topic, 1, 1);
         final int elementCount = 300;
 
@@ -1792,7 +1792,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
     public void runMetricsTest() throws Throwable {
 
         // create a stream with 5 topics
-        final String topic = "metricsStream";
+        final String topic = "metricsStream-" + UUID.randomUUID();
         createTestTopic(topic, 5, 1);
 
         final Tuple1<Throwable> error = new Tuple1<>(null);
@@ -2244,7 +2244,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 
         for (int attempt = 1; attempt <= maxNumAttempts; attempt++) {
 
-            final String topicName = baseTopicName + '-' + attempt;
+            final String topicName = baseTopicName + '-' + attempt + '-' + UUID.randomUUID();
 
             LOG.info("Writing attempt #" + attempt);
 
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 4bbec3620c2..68db69187f7 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -46,6 +46,7 @@ import javax.annotation.Nullable;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.Optional;
+import java.util.UUID;
 
 /** IT cases for Kafka. */
 public class KafkaITCase extends KafkaConsumerTestBase {
@@ -187,7 +188,7 @@ public class KafkaITCase extends KafkaConsumerTestBase {
     @Test(timeout = 60000)
     public void testTimestamps() throws Exception {
 
-        final String topic = "tstopic";
+        final String topic = "tstopic-" + UUID.randomUUID();
         createTestTopic(topic, 3, 1);
 
         // ---------- Produce an event time stream into Kafka -------------------
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index b578bc0b6d8..8bcee368d66 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -53,6 +53,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -97,10 +98,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
         try {
             LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
 
-            final String defaultTopic = "defaultTopic";
+            final String defaultTopic = "defaultTopic-" + UUID.randomUUID();
             final int defaultTopicPartitions = 2;
 
-            final String dynamicTopic = "dynamicTopic";
+            final String dynamicTopic = "dynamicTopic-" + UUID.randomUUID();
             final int dynamicTopicPartitions = 3;
 
             createTestTopic(defaultTopic, defaultTopicPartitions, 1);
@@ -225,15 +226,18 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
      * broker to check whether flushed records since last checkpoint were not duplicated.
      */
     protected void testExactlyOnce(boolean regularSink, int sinksCount) throws Exception {
-        final String topic =
+        final String topicNamePrefix =
                 (regularSink ? "exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator")
                         + sinksCount;
         final int partition = 0;
         final int numElements = 1000;
         final int failAfterElements = 333;
 
+        final List<String> topics = new ArrayList<>();
         for (int i = 0; i < sinksCount; i++) {
-            createTestTopic(topic + i, 1, 1);
+            final String topic = topicNamePrefix + i + "-" + UUID.randomUUID();
+            topics.add(topic);
+            createTestTopic(topic, 1, 1);
         }
 
         TypeInformationSerializationSchema<Integer> schema =
@@ -273,11 +277,11 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 
             if (regularSink) {
                 StreamSink<Integer> kafkaSink =
-                        kafkaServer.getProducerSink(topic + i, schema, properties, partitioner);
+                        kafkaServer.getProducerSink(topics.get(i), schema, properties, partitioner);
                 inputStream.addSink(kafkaSink.getUserFunction());
             } else {
                 kafkaServer.produceIntoKafka(
-                        inputStream, topic + i, schema, properties, partitioner);
+                        inputStream, topics.get(i), schema, properties, partitioner);
             }
         }
 
@@ -286,8 +290,8 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 
         for (int i = 0; i < sinksCount; i++) {
             // assert that before failure we successfully snapshot/flushed all expected elements
-            assertExactlyOnceForTopic(properties, topic + i, expectedElements);
-            deleteTestTopic(topic + i);
+            assertExactlyOnceForTopic(properties, topics.get(i), expectedElements);
+            deleteTestTopic(topics.get(i));
         }
     }
 
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 990047d99c5..9fb16d40c7d 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.Properties;
+import java.util.UUID;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 
@@ -128,7 +129,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
     private static boolean stopProducer = false;
 
     public void runAutoOffsetResetTest() throws Exception {
-        final String topic = "auto-offset-reset-test";
+        final String topic = "auto-offset-reset-test-" + UUID.randomUUID();
 
         final int parallelism = 1;
         final int elementsPerPartition = 50000;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleExactlyOnceITCase.java
index 3deab467dba..7d37f6c34fe 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleExactlyOnceITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleExactlyOnceITCase.java
@@ -31,6 +31,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import java.util.UUID;
+
 import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
 import static org.apache.flink.streaming.api.TimeCharacteristic.IngestionTime;
 import static org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime;
@@ -110,7 +112,7 @@ public class KafkaShuffleExactlyOnceITCase extends KafkaShuffleTestBase {
     private void testKafkaShuffleFailureRecovery(
             int numElementsPerProducer, TimeCharacteristic timeCharacteristic) throws Exception {
 
-        String topic = topic("failure_recovery", timeCharacteristic);
+        String topic = topic("failure_recovery-" + UUID.randomUUID(), timeCharacteristic);
         final int numberOfPartitions = 1;
         final int producerParallelism = 1;
         final int failAfterElements = numElementsPerProducer * numberOfPartitions * 2 / 3;
@@ -150,7 +152,7 @@ public class KafkaShuffleExactlyOnceITCase extends KafkaShuffleTestBase {
      */
     private void testAssignedToPartitionFailureRecovery(
             int numElementsPerProducer, TimeCharacteristic timeCharacteristic) throws Exception {
-        String topic = topic("partition_failure_recovery", timeCharacteristic);
+        String topic = topic("partition_failure_recovery-" + UUID.randomUUID(), timeCharacteristic);
         final int numberOfPartitions = 3;
         final int producerParallelism = 2;
         final int failAfterElements = numElementsPerProducer * producerParallelism * 2 / 3;
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
index 50b61af5eed..9a0a14da39a 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
@@ -48,6 +48,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
 
 import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
 import static org.apache.flink.streaming.api.TimeCharacteristic.IngestionTime;
@@ -181,7 +182,7 @@ public class KafkaShuffleITCase extends KafkaShuffleTestBase {
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         Map<Integer, Collection<ConsumerRecord<byte[], byte[]>>> results =
                 testKafkaShuffleProducer(
-                        topic("test_watermark_broadcast", EventTime),
+                        topic("test_watermark_broadcast-" + UUID.randomUUID(), EventTime),
                         env,
                         numberOfPartitions,
                         producerParallelism,
@@ -250,7 +251,7 @@ public class KafkaShuffleITCase extends KafkaShuffleTestBase {
      */
     private void testKafkaShuffle(int numElementsPerProducer, TimeCharacteristic timeCharacteristic)
             throws Exception {
-        String topic = topic("test_simple", timeCharacteristic);
+        String topic = topic("test_simple-" + UUID.randomUUID(), timeCharacteristic);
         final int numberOfPartitions = 1;
         final int producerParallelism = 1;
 
@@ -287,7 +288,7 @@ public class KafkaShuffleITCase extends KafkaShuffleTestBase {
      */
     private void testAssignedToPartition(
             int numElementsPerProducer, TimeCharacteristic timeCharacteristic) throws Exception {
-        String topic = topic("test_assigned_to_partition", timeCharacteristic);
+        String topic = topic("test_assigned_to_partition-" + UUID.randomUUID(), timeCharacteristic);
         final int numberOfPartitions = 3;
         final int producerParallelism = 2;
 
@@ -331,7 +332,7 @@ public class KafkaShuffleITCase extends KafkaShuffleTestBase {
      */
     private void testWatermarkIncremental(int numElementsPerProducer) throws Exception {
         TimeCharacteristic timeCharacteristic = EventTime;
-        String topic = topic("test_watermark_incremental", timeCharacteristic);
+        String topic = topic("test_watermark_incremental-" + UUID.randomUUID(), timeCharacteristic);
         final int numberOfPartitions = 3;
         final int producerParallelism = 2;
 
@@ -375,7 +376,9 @@ public class KafkaShuffleITCase extends KafkaShuffleTestBase {
         Collection<ConsumerRecord<byte[], byte[]>> records =
                 Iterables.getOnlyElement(
                         testKafkaShuffleProducer(
-                                        topic("test_serde", timeCharacteristic),
+                                        topic(
+                                                "test_serde-" + UUID.randomUUID(),
+                                                timeCharacteristic),
                                         env,
                                         1,
                                         1,
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 273df6465c8..975e5cd4375 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -49,6 +49,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
@@ -91,7 +92,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
     public void testKafkaSourceSink() throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
-        final String topic = "tstopic_" + format;
+        final String topic = "tstopic_" + format + "_" + UUID.randomUUID();
         createTestTopic(topic, 1, 1);
 
         // ---------- Produce an event time stream into Kafka -------------------
@@ -197,7 +198,10 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         List<String> currencies = Arrays.asList("Euro", "Dollar", "Yen", "Dummy");
         List<String> topics =
                 currencies.stream()
-                        .map(currency -> String.format("%s_%s", currency, format))
+                        .map(
+                                currency ->
+                                        String.format(
+                                                "%s_%s_%s", currency, format, UUID.randomUUID()))
                         .collect(Collectors.toList());
         // Because kafka connector currently doesn't support write data into multiple topic
         // together,
@@ -272,7 +276,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
     public void testKafkaSourceSinkWithMetadata() throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
-        final String topic = "metadata_topic_" + format;
+        final String topic = "metadata_topic_" + format + "_" + UUID.randomUUID();
         createTestTopic(topic, 1, 1);
 
         // ---------- Produce an event time stream into Kafka -------------------
@@ -364,7 +368,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
     public void testKafkaSourceSinkWithKeyAndPartialValue() throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
-        final String topic = "key_partial_value_topic_" + format;
+        final String topic = "key_partial_value_topic_" + format + "_" + UUID.randomUUID();
         createTestTopic(topic, 1, 1);
 
         // ---------- Produce an event time stream into Kafka -------------------
@@ -445,7 +449,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
     public void testKafkaSourceSinkWithKeyAndFullValue() throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
-        final String topic = "key_full_value_topic_" + format;
+        final String topic = "key_full_value_topic_" + format + "_" + UUID.randomUUID();
         createTestTopic(topic, 1, 1);
 
         // ---------- Produce an event time stream into Kafka -------------------
@@ -529,10 +533,11 @@ public class KafkaTableITCase extends KafkaTableTestBase {
 
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
-        final String orderTopic = "temporal_join_topic_order_" + format;
+        final String orderTopic = "temporal_join_topic_order_" + format + "_" + UUID.randomUUID();
         createTestTopic(orderTopic, 1, 1);
 
-        final String productTopic = "temporal_join_topic_product_" + format;
+        final String productTopic =
+                "temporal_join_topic_product_" + format + "_" + UUID.randomUUID();
         createTestTopic(productTopic, 1, 1);
 
         // ---------- Produce an event time stream into Kafka -------------------
@@ -665,7 +670,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
     public void testPerPartitionWatermarkKafka() throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
-        final String topic = "per_partition_watermark_topic_" + format;
+        final String topic = "per_partition_watermark_topic_" + format + "_" + UUID.randomUUID();
         createTestTopic(topic, 4, 1);
 
         // ---------- Produce an event time stream into Kafka -------------------
@@ -755,7 +760,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
     public void testPerPartitionWatermarkWithIdleSource() throws Exception {
         // we always use a different topic name for each parameterized topic,
         // in order to make sure the topic can be created.
-        final String topic = "idle_partition_watermark_topic_" + format;
+        final String topic = "idle_partition_watermark_topic_" + format + "_" + UUID.randomUUID();
         createTestTopic(topic, 4, 1);
 
         // ---------- Produce an event time stream into Kafka -------------------
@@ -968,7 +973,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         // in order to make sure the topic can be created.
         final String resetStrategy = "none";
         final String tableName = resetStrategy + "Table";
-        final String topic = "groupOffset_" + format;
+        final String topic = "groupOffset_" + format + "_" + UUID.randomUUID();
         String groupId = resetStrategy + (new Random()).nextInt();
 
         TableResult tableResult = null;


[flink] 01/02: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0df3c6668eb399eaa0e5e95626b2794614566803
Author: Gabor Somogyi <ga...@apple.com>
AuthorDate: Mon Nov 7 10:21:05 2022 +0100

    [FLINK-29914][tests] Wait for Kafka topic creation/deletion
---
 .../flink/connector/kafka/sink/KafkaSinkITCase.java   |  5 ++---
 .../sink/testutils/KafkaSinkExternalContext.java      | 12 ++----------
 .../kafka/source/reader/KafkaSourceReaderTest.java    |  7 +++++--
 .../kafka/testutils/KafkaSourceExternalContext.java   |  6 ++++--
 .../connectors/kafka/KafkaConsumerTestBase.java       |  9 +++------
 .../connectors/kafka/KafkaTestEnvironmentImpl.java    | 14 +++-----------
 .../connectors/kafka/table/KafkaTableTestBase.java    | 19 ++++++++++++++-----
 .../flink/tests/util/kafka/KafkaContainerClient.java  | 12 ++++++++++--
 .../flink/tests/util/kafka/SmokeKafkaITCase.java      |  8 +++++---
 9 files changed, 48 insertions(+), 44 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index be8180ee398..a5e7f1f1334 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -99,7 +99,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -445,13 +444,13 @@ public class KafkaSinkITCase extends TestLogger {
                 admin.createTopics(
                         Collections.singletonList(
                                 new NewTopic(topic, numPartitions, replicationFactor)));
-        result.all().get(1, TimeUnit.MINUTES);
+        result.all().get();
     }
 
     private void deleteTestTopic(String topic)
             throws ExecutionException, InterruptedException, TimeoutException {
         final DeleteTopicsResult result = admin.deleteTopics(Collections.singletonList(topic));
-        result.all().get(1, TimeUnit.MINUTES);
+        result.all().get();
     }
 
     private List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
index 34e4dbff61d..ee9ac2193ce 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
@@ -57,7 +57,6 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
 
@@ -67,7 +66,6 @@ public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkExternalContext.class);
 
     private static final String TOPIC_NAME_PREFIX = "kafka-single-topic";
-    private static final long DEFAULT_TIMEOUT = 30L;
     private static final int RANDOM_STRING_MAX_LENGTH = 50;
     private static final int NUM_RECORDS_UPPER_BOUND = 500;
     private static final int NUM_RECORDS_LOWER_BOUND = 100;
@@ -100,10 +98,7 @@ public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext
                 replicationFactor);
         NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
         try {
-            kafkaAdminClient
-                    .createTopics(Collections.singletonList(newTopic))
-                    .all()
-                    .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+            kafkaAdminClient.createTopics(Collections.singletonList(newTopic)).all().get();
         } catch (Exception e) {
             throw new RuntimeException(String.format("Cannot create topic '%s'", topicName), e);
         }
@@ -112,10 +107,7 @@ public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext
     private void deleteTopic(String topicName) {
         LOG.debug("Deleting Kafka topic {}", topicName);
         try {
-            kafkaAdminClient
-                    .deleteTopics(Collections.singletonList(topicName))
-                    .all()
-                    .get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+            kafkaAdminClient.deleteTopics(Collections.singletonList(topicName)).all().get();
         } catch (Exception e) {
             if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) {
                 throw new RuntimeException(
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index e92fd75c76f..de9246cec1b 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -88,8 +88,11 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp
     public static void setup() throws Throwable {
         KafkaSourceTestEnv.setup();
         try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) {
-            adminClient.createTopics(
-                    Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1)));
+            adminClient
+                    .createTopics(
+                            Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1)))
+                    .all()
+                    .get();
             // Use the admin client to trigger the creation of internal __consumer_offsets topic.
             // This makes sure that we won't see unavailable coordinator in the tests.
             waitUtil(
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
index 5f76ac6b2bd..67b214bc1c6 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
@@ -204,8 +204,10 @@ public class KafkaSourceExternalContext implements DataStreamSourceExternalConte
                     new TopicPartition(topicName, numPartitions));
         } else {
             LOG.info("Creating topic '{}'", topicName);
-            adminClient.createTopics(
-                    Collections.singletonList(new NewTopic(topicName, 1, (short) 1)));
+            adminClient
+                    .createTopics(Collections.singletonList(new NewTopic(topicName, 1, (short) 1)))
+                    .all()
+                    .get();
             return new KafkaPartitionDataWriter(
                     getKafkaProducerProperties(0), new TopicPartition(topicName, 0));
         }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 899463d6d11..ac5eef772b2 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1253,15 +1253,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
                         new RichParallelSourceFunction<Tuple3<Integer, Integer, String>>() {
 
                             @Override
-                            public void run(SourceContext<Tuple3<Integer, Integer, String>> ctx)
-                                    throws Exception {
+                            public void run(SourceContext<Tuple3<Integer, Integer, String>> ctx) {
                                 int partition = getRuntimeContext().getIndexOfThisSubtask();
 
                                 for (int topicId = 0; topicId < numTopics; topicId++) {
                                     for (int i = 0; i < numElements; i++) {
                                         ctx.collect(
-                                                new Tuple3<>(
-                                                        partition, i, topicNamePrefix + topicId));
+                                                new Tuple3<>(partition, i, topics.get(topicId)));
                                     }
                                 }
                             }
@@ -1333,8 +1331,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
         tryExecute(env, "Count elements from the topics");
 
         // delete all topics again
-        for (int i = 0; i < numTopics; i++) {
-            final String topic = topicNamePrefix + i;
+        for (String topic : topics) {
             deleteTestTopic(topic);
         }
     }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index df2824ae02d..0ef16f76458 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -142,15 +142,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
     private void tryDelete(AdminClient adminClient, String topic) throws Exception {
         try {
-            adminClient
-                    .deleteTopics(Collections.singleton(topic))
-                    .all()
-                    .get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+            adminClient.deleteTopics(Collections.singleton(topic)).all().get();
             CommonTestUtils.waitUtil(
                     () -> {
                         try {
-                            return adminClient.listTopics().listings()
-                                    .get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS).stream()
+                            return adminClient.listTopics().listings().get().stream()
                                     .map(TopicListing::name)
                                     .noneMatch((name) -> name.equals(topic));
                         } catch (Exception e) {
@@ -164,11 +160,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
             LOG.info(
                     "Did not receive delete topic response within {} seconds. Checking if it succeeded",
                     REQUEST_TIMEOUT_SECONDS);
-            if (adminClient
-                    .listTopics()
-                    .names()
-                    .get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS)
-                    .contains(topic)) {
+            if (adminClient.listTopics().names().get().contains(topic)) {
                 throw new Exception("Topic still exists after timeout", e);
             }
         }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index 20d03af74d2..10b11ab2b58 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -53,7 +53,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /** Base class for Kafka Table IT Cases. */
@@ -135,8 +134,16 @@ public abstract class KafkaTableTestBase extends AbstractTestBase {
         properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
         try (AdminClient admin = AdminClient.create(properties)) {
             admin.createTopics(
-                    Collections.singletonList(
-                            new NewTopic(topic, numPartitions, (short) replicationFactor)));
+                            Collections.singletonList(
+                                    new NewTopic(topic, numPartitions, (short) replicationFactor)))
+                    .all()
+                    .get();
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Fail to create topic [%s partitions: %d replication factor: %d].",
+                            topic, numPartitions, replicationFactor),
+                    e);
         }
     }
 
@@ -145,7 +152,7 @@ public abstract class KafkaTableTestBase extends AbstractTestBase {
         properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
         try (AdminClient admin = AdminClient.create(properties)) {
             ListConsumerGroupOffsetsResult result = admin.listConsumerGroupOffsets(groupId);
-            return result.partitionsToOffsetAndMetadata().get(20, TimeUnit.SECONDS);
+            return result.partitionsToOffsetAndMetadata().get();
         } catch (Exception e) {
             throw new IllegalStateException(
                     String.format("Fail to get consumer offsets with the group id [%s].", groupId),
@@ -157,7 +164,9 @@ public abstract class KafkaTableTestBase extends AbstractTestBase {
         Map<String, Object> properties = new HashMap<>();
         properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
         try (AdminClient admin = AdminClient.create(properties)) {
-            admin.deleteTopics(Collections.singletonList(topic));
+            admin.deleteTopics(Collections.singletonList(topic)).all().get();
+        } catch (Exception e) {
+            throw new IllegalStateException(String.format("Fail to delete topic [%s].", topic), e);
         }
     }
 
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
index 6d581ba74e5..d3f45e0e60a 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
@@ -68,8 +68,16 @@ public class KafkaContainerClient {
                 CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, container.getBootstrapServers());
         try (AdminClient admin = AdminClient.create(properties)) {
             admin.createTopics(
-                    Collections.singletonList(
-                            new NewTopic(topic, numPartitions, (short) replicationFactor)));
+                            Collections.singletonList(
+                                    new NewTopic(topic, numPartitions, (short) replicationFactor)))
+                    .all()
+                    .get();
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Fail to create topic [%s partitions: %d replication factor: %d].",
+                            topic, numPartitions, replicationFactor),
+                    e);
         }
     }
 
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
index b5072b1a430..e350846f3ff 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
@@ -137,9 +137,11 @@ public class SmokeKafkaITCase {
         // create the required topics
         final short replicationFactor = 1;
         admin.createTopics(
-                Lists.newArrayList(
-                        new NewTopic(inputTopic, 1, replicationFactor),
-                        new NewTopic(outputTopic, 1, replicationFactor)));
+                        Lists.newArrayList(
+                                new NewTopic(inputTopic, 1, replicationFactor),
+                                new NewTopic(outputTopic, 1, replicationFactor)))
+                .all()
+                .get();
 
         producer.send(new ProducerRecord<>(inputTopic, 1));
         producer.send(new ProducerRecord<>(inputTopic, 2));