You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/07 09:27:03 UTC

[GitHub] [flink] gaborgsomogyi opened a new pull request, #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

gaborgsomogyi opened a new pull request, #21247:
URL: https://github.com/apache/flink/pull/21247

   ## What is the purpose of the change
   
   Kafka creation/deletion is an async operation. The API gives back future(s) which must be waited in order to be on the safe side. Additionally the APIs can throw exceptions while waiting to finish. In this PR I've added a more or less consistent Kafka topic creation/deletion behavior to the tests. Namely 30 seconds timeout to all the places where it was missing. This may or may not fix the issue (Kafka itself can be buggy) but I'm pretty sure w/o consistent topic creation/deletion API usage we're in trouble.
   
   ## Brief change log
   
   Wait for Kafka topic creation/deletion.
   
   ## Verifying this change
   
   Existing unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1306944465

   Sounds promising. The change so far looks reasonable. Good job. :-) Could you ping me as soon as you're done?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1307194137

   > Could you ping me as soon as you're done?
   
   Ping :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1305448502

   > I'm just wondering whether adding the timeouts actually help our mission to stabilize the tests
   
   If we can kick this to a green state we can remove the timeouts since the stability comes from waiting for the Java `future` via `.all().get...`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1015290704


##########
flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java:
##########
@@ -261,8 +262,20 @@ private void sendMessages(String topic, String... messages) {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
         props.put(ProducerConfig.ACKS_CONFIG, "all");
 
+        int numPartitions = 1;
+        short replicationFactor = 1;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1308632876

   @PatrickRen can you have a look at it as well since it covers FLINK-24119


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1305696827

   I've just fixed another layer of issue:
   ```
   Nov 07 13:16:23 [ERROR] org.apache.flink.streaming.connectors.kafka.KafkaITCase.testMultipleTopicsWithKafkaSerializer  Time elapsed: 240.264 s  <<< ERROR!
   Nov 07 13:16:23 org.junit.runners.model.TestTimedOutException: test timed out after 60000 milliseconds
   ```
   However this issue was still there:
   ```
   Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The request timed out.
   ```
   If the last change is not fixing it then I'm leaving this issue for now. That can be a tricky one if it's not generated by previous timeouts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gyfora commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1310033222

   I will merge this unless you have any objections @XComp , let's get the tests back on track :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1305333376

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cfe419e676fac5d41e0291f22d60af39e773aae6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "cfe419e676fac5d41e0291f22d60af39e773aae6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cfe419e676fac5d41e0291f22d60af39e773aae6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1017847657


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -204,7 +204,7 @@ public void testFlushAfterClosed() {
 
     @Test(timeout = 30000L)
     public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
-        String topic = "flink-kafka-producer-txn-coordinator-changed";
+        String topic = "flink-kafka-producer-txn-coordinator-changed" + UUID.randomUUID();

Review Comment:
   Added `-` to all places :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1017857996


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java:
##########
@@ -135,8 +134,16 @@ public void createTestTopic(String topic, int numPartitions, int replicationFact
         properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
         try (AdminClient admin = AdminClient.create(properties)) {
             admin.createTopics(
-                    Collections.singletonList(
-                            new NewTopic(topic, numPartitions, (short) replicationFactor)));
+                            Collections.singletonList(
+                                    new NewTopic(topic, numPartitions, (short) replicationFactor)))
+                    .all()
+                    .get();
+        } catch (Exception e) {
+            throw new IllegalStateException(

Review Comment:
   I would leave it in the beautification phase. I think it would be good to fix a 1+ year issue. Pretty sure several hundred CI and dev hours we can spare by letting beautification for later :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1308681238

   * Some have still timeouts included (e.g. [KafkaSinkITCase:438](https://github.com/apache/flink/blob/e95897694d59126d1949a7f2337bcc3b7ce49747/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java#L438) and [KafkaSinkITCase:432](https://github.com/apache/flink/blob/e95897694d59126d1949a7f2337bcc3b7ce49747/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java#L432), [KafkaSinkExternalContext:118](https://github.com/apache/flink/blob/025675725336cd572aa2601be525efd4995e5b84/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java#L118), ...)
   
   I've removed timeouts here too.
   
   * No special error handling (e.g. [KafkaEnumeratorTest:217](https://github.com/apache/flink/blob/e95897694d59126d1949a7f2337bcc3b7ce49747/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java#L217))
   
   I think w/o deep understand what was the intention there I wouldn't change anything so leaving that as-is was intentional. W/o super deep consideration I would say we're not intended to test Kafka topic delete functionality. If topic can be created/modified and Flink is able to process the data, then we're fine. In my view we need a full concept w/ in-depth consideration to do proper refactoring in this area (I mean Kafka server/Consumer/Producer/AdminClient/Topic lifecycle).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1305458387

   All in all we have multiple issues stacked up:
   * Creating the same topic exception was ignored via bad Kafka API usage (missing future wait)
   * Topics are not deleted in tests (which makes sense in case of exception)
   * Somehow the producer is not super happy and throwing timeout exception (this may be a Kafka which requires huge amount of time to debug)
   
   I think the actual code should resolve the first 2 issues, the third may or may not disappear :/
   Hope it will pass, such case we can just remove timeouts and ship it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1309176863

   > I found one other timeout (but that should be the last one; all others are already covered by your PR as far as I can see): [KafkaTestEnvironmentImpl:148](https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java#L148)
   
   I've removed all w/ such pattern.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1309794401

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1310258486

   Thanks guys, opened backport PRs...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1307084416

   I agree w/ the direction but as you mentioned in a follow-up PR. Now everything is scattered which is bad in general.
   After we've fixed the tests w/ a fairly ugly code we could centralize the common tasks.
   I've the following in my mind:
   * Topic creation
   * Topic deletion
   * Random name generation
   * AdminClient creation
   * Consumer creation
   * Producer creation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1018279703


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java:
##########
@@ -1238,7 +1238,7 @@ public void runProduceConsumeMultipleTopics(boolean useLegacySchema) throws Exce
         // create topics with content
         final List<String> topics = new ArrayList<>();
         for (int i = 0; i < numTopics; i++) {
-            final String topic = topicNamePrefix + i;
+            final String topic = topicNamePrefix + i + UUID.randomUUID();

Review Comment:
   There were several places where random was not added so now added them manually.
   Having a function which is not generic I'm not favoring. If we have `createRandomizedTestTopic` then we enforce devs to use randomness but sometimes one may need fixed names. That's the reason why I say let's have an end-to-end concept.
   
   W/o too much consideration I would vote later on to have a `generateRandomTopicName(String prefix)` and `createTopic(String topic, int numPartitions, int replicationFactor)`. This way one can play lego based on needs.
   
   There are some leftovers where no random added because these doesn't have restart so they either pass or fail.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1018776191


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java:
##########
@@ -1238,7 +1238,7 @@ public void runProduceConsumeMultipleTopics(boolean useLegacySchema) throws Exce
         // create topics with content
         final List<String> topics = new ArrayList<>();
         for (int i = 0; i < numTopics; i++) {
-            final String topic = topicNamePrefix + i;
+            final String topic = topicNamePrefix + i + UUID.randomUUID();

Review Comment:
   > sorry for being nitpicky about this specific one but I feel like here it's even more crucial that we add a separator since we provide this i identifier to each topic which would be kind of swallowed into the UUID.
   I see, makes sense. You missed my initial comment, though. I shouldn't have added the second comment to this thread, I guess. Sorry for that one. But I think that the separator is important in the case above.



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java:
##########
@@ -1238,7 +1238,7 @@ public void runProduceConsumeMultipleTopics(boolean useLegacySchema) throws Exce
         // create topics with content
         final List<String> topics = new ArrayList<>();
         for (int i = 0; i < numTopics; i++) {
-            final String topic = topicNamePrefix + i;
+            final String topic = topicNamePrefix + i + UUID.randomUUID();

Review Comment:
   > sorry for being nitpicky about this specific one but I feel like here it's even more crucial that we add a separator since we provide this i identifier to each topic which would be kind of swallowed into the UUID.
   
   I see, makes sense. You missed my initial comment, though. I shouldn't have added the second comment to this thread, I guess. Sorry for that one. But I think that the separator is important in the case above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1309275901

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gyfora commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1310039059

   If the other builds are also affected we should backport this I think 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1305441287

   Now it died w/ the following exception:
   ```
   Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'partition_failure_recovery_EventTime' already exists.
   ```
   Just to try it out I've temporarily cherry-picked #21189 to see how it behaves.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gyfora commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1018881122


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -204,7 +204,7 @@ public void testFlushAfterClosed() {
 
     @Test(timeout = 30000L)
     public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
-        String topic = "flink-kafka-producer-txn-coordinator-changed";
+        String topic = "flink-kafka-producer-txn-coordinator-changed-" + UUID.randomUUID();

Review Comment:
   I think the point here is that adding randomness in general doesn't hurt, it might not be strictly necessary everwhere but it's easier to add it everywhere to fix the test instability which is a very urgent matter.
   
   Let's go ahead with the current implementation, if someone want's to improve this further we can do it once the instabilities are gone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1309212456

   Conceptually not all tests need random topics just where retry is happening. Of course adding random in general would be better design. If you guys asking me then later on we need to throw out huge amount of code in this area and just rewrite it to eliminate random failures. That mentioned we need a full concept and roughly several weeks to understand what each test does and kick through the design. What I've done here is a no-brainer copy-paste because I don't have now that time but just wanted to make it work. Adding any minor beautification to this area is waste of time from my perspective.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1309415509

   Couple of tests failed because they expect hardcoded name so rolled them back.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1017854901


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java:
##########
@@ -88,8 +88,11 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp
     public static void setup() throws Throwable {
         KafkaSourceTestEnv.setup();
         try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) {
-            adminClient.createTopics(
-                    Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1)));
+            adminClient
+                    .createTopics(
+                            Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1)))
+                    .all()
+                    .get();
             // Use the admin client to trigger the creation of internal __consumer_offsets topic.
             // This makes sure that we won't see unavailable coordinator in the tests.
             waitUtil(

Review Comment:
   In short yes. To give a little bit more detail:
   
   * We create a general public topic w/ `createTopics` but the `waitUtil` is waiting for an internal topic named `__consumer_offsets` to be created. They have nothing to do w/ each other. Unless we consider the edge case where inside Kafka there is no public topic and nobody enforced consumer group offset storage.
   * It's not written anywhere but according to my Kafka friends if one wants stability then all places where the API gives back future one must wait for it.
   * Pure personal opinion, I'm dealing w/ Kafka for ~5 years and if something is forgotten to be synced then one can see such magical blow-ups which is horror to catch :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gyfora merged pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #21247:
URL: https://github.com/apache/flink/pull/21247


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1305339940

   This would justify couple of facts/behaviors. Namely test retry was introduced because of failures. I can simply imagine the following situation which justifies why retry introduced:
   * Test initiated topic creation
   * Topic not yet created
   * Test tried to read/write topic and failed (no topic auto creation)
   * Test retried
   * Topic creation finished
   * Test may passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1308575925

   FYI: I created a follow-up Jira issue for the Kafka environment topic: FLINK-29956


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
XComp commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1306963927

    I'm also wondering whether it would be reasonable to unify the Kafka test utilities (in a follow-up task). :thinking: multiple tests create their own admin client and deal with Kafka environments differently (`KafkaSourceTestEnv`, `KafkaTestEnvironmentImpl`, `KafkaTableTestBase`). It feels like we should have a JUnit5 Extension for that as well. We could also move the topic randomization in a single location at the end. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1018783788


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java:
##########
@@ -1238,7 +1238,7 @@ public void runProduceConsumeMultipleTopics(boolean useLegacySchema) throws Exce
         // create topics with content
         final List<String> topics = new ArrayList<>();
         for (int i = 0; i < numTopics; i++) {
-            final String topic = topicNamePrefix + i;
+            final String topic = topicNamePrefix + i + UUID.randomUUID();

Review Comment:
   Ah, made it 10pm w/ super tired. Added it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1018791713


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -204,7 +204,7 @@ public void testFlushAfterClosed() {
 
     @Test(timeout = 30000L)
     public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
-        String topic = "flink-kafka-producer-txn-coordinator-changed";
+        String topic = "flink-kafka-producer-txn-coordinator-changed-" + UUID.randomUUID();

Review Comment:
   Just an example code location where I don't understand why we have to add randomness. As far as I understand now (and based on your previous comment), we're adding the randomness in cases where a test retry is potentially triggered due to some flakiness. I would understand if a topic is reused in other tests which is also not the case here. 
   It feels that I'm missing something here because I don't see none of the reasonings I could come up with being applied here, for instance. There are a few other tests where I'm struggling to find the reason as well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1017896227


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java:
##########
@@ -88,8 +88,11 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp
     public static void setup() throws Throwable {
         KafkaSourceTestEnv.setup();
         try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) {
-            adminClient.createTopics(
-                    Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1)));
+            adminClient
+                    .createTopics(
+                            Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1)))
+                    .all()
+                    .get();
             // Use the admin client to trigger the creation of internal __consumer_offsets topic.
             // This makes sure that we won't see unavailable coordinator in the tests.
             waitUtil(

Review Comment:
   fair enough - I just wanted to bring it up at least once. I'm not in a position to judge here.



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java:
##########
@@ -88,8 +88,11 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp
     public static void setup() throws Throwable {
         KafkaSourceTestEnv.setup();
         try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) {
-            adminClient.createTopics(
-                    Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1)));
+            adminClient
+                    .createTopics(
+                            Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1)))
+                    .all()
+                    .get();
             // Use the admin client to trigger the creation of internal __consumer_offsets topic.
             // This makes sure that we won't see unavailable coordinator in the tests.
             waitUtil(

Review Comment:
   fair enough - I just wanted to bring it up at least once. I'm not in a position to judge here. :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1017969434


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java:
##########
@@ -1238,7 +1238,7 @@ public void runProduceConsumeMultipleTopics(boolean useLegacySchema) throws Exce
         // create topics with content
         final List<String> topics = new ArrayList<>();
         for (int i = 0; i < numTopics; i++) {
-            final String topic = topicNamePrefix + i;
+            final String topic = topicNamePrefix + i + UUID.randomUUID();

Review Comment:
   I noticed that we have also other occurrences where we're not adding the UUID to the topic (I didn't check that in my first pass). Is there a specific reason why we only select certain topics? Alternatively, we could add the randomization of topics to [KafkaTestBase.createTestTopic](https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java#L215) and [KafkaTableTestBase#createTestTopic](https://github.com/apache/flink/blob/ca9fea2459f7e066eac0fb59b382e58c41e0d702/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java#L132). We would have to change the methods' signatures to return the randomized topic String (we should rename the method name in that case as well) which will then be used in the corresponding tests:
   ```
   // old method signature:
   public void createTestTopic(String topic, int numPartitions, int replicationFactor) {
   // ...
   // new method signature:
   public String createRandomizedTestTopic(String topicPrefix, int numPartitions, int replicationFactor) {
   ```
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1017711266


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java:
##########
@@ -88,8 +88,11 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp
     public static void setup() throws Throwable {
         KafkaSourceTestEnv.setup();
         try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) {
-            adminClient.createTopics(
-                    Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1)));
+            adminClient
+                    .createTopics(
+                            Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1)))
+                    .all()
+                    .get();
             // Use the admin client to trigger the creation of internal __consumer_offsets topic.
             // This makes sure that we won't see unavailable coordinator in the tests.
             waitUtil(

Review Comment:
   is the wait still necessary if we make the topic creation blocking?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java:
##########
@@ -135,8 +134,16 @@ public void createTestTopic(String topic, int numPartitions, int replicationFact
         properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
         try (AdminClient admin = AdminClient.create(properties)) {
             admin.createTopics(
-                    Collections.singletonList(
-                            new NewTopic(topic, numPartitions, (short) replicationFactor)));
+                            Collections.singletonList(
+                                    new NewTopic(topic, numPartitions, (short) replicationFactor)))
+                    .all()
+                    .get();
+        } catch (Exception e) {
+            throw new IllegalStateException(

Review Comment:
   We could have come up with a general exception for these kind of cases as well since they all reflect the same issue. That way, we could have encapsulated the `String.format` and varargs in the constructor. Just as an idea... I'm not insisting on that change considering that we're planning to create a follow-up where all of that is revisited, anyway.



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -204,7 +204,7 @@ public void testFlushAfterClosed() {
 
     @Test(timeout = 30000L)
     public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
-        String topic = "flink-kafka-producer-txn-coordinator-changed";
+        String topic = "flink-kafka-producer-txn-coordinator-changed" + UUID.randomUUID();

Review Comment:
   ```suggestion
           String topic = "flink-kafka-producer-txn-coordinator-changed-" + UUID.randomUUID();
   ```
   That's really a nitty thing but I would add a separator between the topic prefix and the random ID. But I leave it up to you based on how much things like that annoy you as well. :-D
   
   This comment applies to a few other code locations as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1018270360


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -429,13 +428,12 @@ private void createTestTopic(String topic, int numPartitions, short replicationF
                 admin.createTopics(
                         Collections.singletonList(
                                 new NewTopic(topic, numPartitions, replicationFactor)));
-        result.all().get(1, TimeUnit.MINUTES);

Review Comment:
   Moved.



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java:
##########
@@ -100,10 +98,7 @@ private void createTopic(String topicName, int numPartitions, short replicationF
                 replicationFactor);
         NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
         try {
-            kafkaAdminClient

Review Comment:
   Moved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1306862718

   With fresh mind I think I've found the last issue :) Fixing it.
   Additionally removing all timeouts as we've agreed and let's see...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on PR #21247:
URL: https://github.com/apache/flink/pull/21247#issuecomment-1305328264

   cc @XComp 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1015250934


##########
flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java:
##########
@@ -261,8 +262,20 @@ private void sendMessages(String topic, String... messages) {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
         props.put(ProducerConfig.ACKS_CONFIG, "all");
 
+        int numPartitions = 1;
+        short replicationFactor = 1;

Review Comment:
   ```suggestion
           final int numPartitions = 1;
           final short replicationFactor = 1;
   ```



##########
flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java:
##########
@@ -261,8 +262,20 @@ private void sendMessages(String topic, String... messages) {
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
         props.put(ProducerConfig.ACKS_CONFIG, "all");
 
+        int numPartitions = 1;
+        short replicationFactor = 1;

Review Comment:
   ```suggestion
           final int numPartitions = 1;
           final short replicationFactor = 1;
   ```
   nit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1017943854


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java:
##########
@@ -1238,7 +1238,7 @@ public void runProduceConsumeMultipleTopics(boolean useLegacySchema) throws Exce
         // create topics with content
         final List<String> topics = new ArrayList<>();
         for (int i = 0; i < numTopics; i++) {
-            final String topic = topicNamePrefix + i;
+            final String topic = topicNamePrefix + i + UUID.randomUUID();

Review Comment:
   ```suggestion
               final String topic = String.format("%s-%d-%s", topicNamePrefix, i, UUID.randomUUID());
   ```
   sorry for being nitpicky about this specific one but I feel like here it's even more crucial that we add a separator since we provide this `i` identifier to each topic which would be kind of swallowed into the UUID.



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java:
##########
@@ -100,10 +98,7 @@ private void createTopic(String topicName, int numPartitions, short replicationF
                 replicationFactor);
         NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
         try {
-            kafkaAdminClient

Review Comment:
   The diff `KafkaSinkExternalContext` should go into the FLINK-29914 commit.



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -429,13 +428,12 @@ private void createTestTopic(String topic, int numPartitions, short replicationF
                 admin.createTopics(
                         Collections.singletonList(
                                 new NewTopic(topic, numPartitions, replicationFactor)));
-        result.all().get(1, TimeUnit.MINUTES);

Review Comment:
   The diff if `KafkaSinkITCase` should go into the FLINK-29914 commit.



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java:
##########
@@ -1238,7 +1238,7 @@ public void runProduceConsumeMultipleTopics(boolean useLegacySchema) throws Exce
         // create topics with content
         final List<String> topics = new ArrayList<>();
         for (int i = 0; i < numTopics; i++) {
-            final String topic = topicNamePrefix + i;
+            final String topic = topicNamePrefix + i + UUID.randomUUID();

Review Comment:
   I noticed that we have also other occurrences where we're not adding the UUID to the topic (I didn't check that in my first pass). Is there a specific reason why we only select certain topics? Alternatively, we could add the randomization of topics to [KafkaTestBase.createTestTopic](https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java#L215) and [KafkaTableTestBase](https://github.com/apache/flink/blob/ca9fea2459f7e066eac0fb59b382e58c41e0d702/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java#L132). We would have to change the methods' signatures to return the randomized topic String (we should rename the method name in that case as well) which will then be used in the corresponding tests:
   ```
   // old method signature:
   public void createTestTopic(String topic, int numPartitions, int replicationFactor) {
   // ...
   // new method signature:
   public String createRandomizedTestTopic(String topicPrefix, int numPartitions, int replicationFactor) {
   ```
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] XComp commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
XComp commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1017896227


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java:
##########
@@ -88,8 +88,11 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp
     public static void setup() throws Throwable {
         KafkaSourceTestEnv.setup();
         try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) {
-            adminClient.createTopics(
-                    Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1)));
+            adminClient
+                    .createTopics(
+                            Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS, (short) 1)))
+                    .all()
+                    .get();
             // Use the admin client to trigger the creation of internal __consumer_offsets topic.
             // This makes sure that we won't see unavailable coordinator in the tests.
             waitUtil(

Review Comment:
   fair enough - I just wanted to bring it up at least once. I'm not in a position to judge here. :-) But it sounds reasonable to keep it to be on the safe side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaborgsomogyi commented on a diff in pull request #21247: [FLINK-29914][tests] Wait for Kafka topic creation/deletion

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1018783788


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java:
##########
@@ -1238,7 +1238,7 @@ public void runProduceConsumeMultipleTopics(boolean useLegacySchema) throws Exce
         // create topics with content
         final List<String> topics = new ArrayList<>();
         for (int i = 0; i < numTopics; i++) {
-            final String topic = topicNamePrefix + i;
+            final String topic = topicNamePrefix + i + UUID.randomUUID();

Review Comment:
   Ah, made it 10pm w/ super tired brain. Added it now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org