You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/08 18:31:52 UTC

[GitHub] [flink] ashmeet-kandhari opened a new pull request, #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

ashmeet-kandhari opened a new pull request, #20991:
URL: https://github.com/apache/flink/pull/20991

   ## What is the purpose of the change
   
   *This pull request migrates the flink-connector-kafka module unit test cases to Junit 5*
   
   ## Brief change log
   
     - *Updated simple junit 4 test packages to junit 5 test packages*
     - *Disabled a few  failing test cases with added TODO comment to discuss or get more info around them*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by "ashmeet-kandhari (via GitHub)" <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1416496203

   Hi @snuyanzin 
   
   I am thinking of creating new branch and start slowly
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047274424


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -49,9 +50,9 @@
 import java.util.UUID;
 
 /** IT cases for Kafka. */
-public class KafkaITCase extends KafkaConsumerTestBase {
+class KafkaITCase extends KafkaConsumerTestBase {
 
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {

Review Comment:
   could be package private or protected i guess



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047270638


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testPartitionsForAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.partitionsFor("Topic");
+        assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testInitTransactionsAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testInitTransactionsAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.initTransactions();
+        assertThatThrownBy(kafkaProducer::initTransactions)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testBeginTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testPartitionsForAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.partitionsFor("Topic");
+        assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testInitTransactionsAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testInitTransactionsAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.initTransactions();
+        assertThatThrownBy(kafkaProducer::initTransactions)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testBeginTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testBeginTransactionAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.initTransactions();
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.beginTransaction();
+        assertThatThrownBy(kafkaProducer::beginTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testCommitTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)

Review Comment:
   time unit should be specified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047272272


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -219,7 +227,8 @@ public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
      * pending records. The test for that is covered in testAtLeastOnceProducer.
      */
     @SuppressWarnings("unchecked")
-    @Test(timeout = 5000)
+    @Test
+    @Timeout(5L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -280,7 +289,8 @@ public void go() throws Exception {
      * the test will not finish if the logic is broken.
      */
     @SuppressWarnings("unchecked")
-    @Test(timeout = 10000)
+    @Test
+    @Timeout(10L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -353,7 +363,8 @@ public void go() throws Exception {
      * records; we set a timeout because the test will not finish if the logic is broken.
      */
     @SuppressWarnings("unchecked")
-    @Test(timeout = 5000)
+    @Test
+    @Timeout(10L)

Review Comment:
   time unit should be specified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by "ashmeet-kandhari (via GitHub)" <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1613909720

   Hi @sergey,
   
   Unfortunately, I will not be able to work on this due to some personal work.
   
   Better to reassign this task
   
   Regards,
   Ashmeet.
   
   
   On Fri, Jun 16, 2023, 04:24 ashmeet kandhari ***@***.***>
   wrote:
   
   > Hi @Sergey,
   >
   > I am on vacation and will be back in July.
   >
   > I can continue after that.
   >
   > Thanks
   > Ashmeet
   >
   > On Tue, Jun 6, 2023, 13:11 Sergey Nuyanzin ***@***.***>
   > wrote:
   >
   >> Hi @ashmeet-kandhari <https://github.com/ashmeet-kandhari> I wonder
   >> whether you going to continue working on this PR or not?
   >>
   >> —
   >> Reply to this email directly, view it on GitHub
   >> <https://github.com/apache/flink/pull/20991#issuecomment-1578500753>, or
   >> unsubscribe
   >> <https://github.com/notifications/unsubscribe-auth/AI7OGHGWRFOJOPZSGQCUWQDXJ4F6HANCNFSM6AAAAAARALJTVI>
   >> .
   >> You are receiving this because you were mentioned.Message ID:
   >> ***@***.***>
   >>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1279718997

   > Thanks for your contribution @ashmeet-kandhari As I mentioned in jira issue we could continue code related discussions here
   > 
   > Could you please share more detail/link to ci with failure you've mentioned? also about changes: i think changes of `docs/themes/book` are not necessary for this PR and could be removed
   
   Hi @snuyanzin,
   I was testing the migration changes made to KafkaTableITCase and UpsertKafkaTableITCase locally, I have pushed it now 
   Will see the CI again and let you know if it's failing.
   
   I will also slowly revert the changes made in `docs/themes/book` not sure what caused them, as I have not touched it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034898677


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializerTest.java:
##########
@@ -29,7 +30,8 @@
  * Tests for serializing and deserialzing {@link KafkaWriterState} with {@link
  * KafkaWriterStateSerializer}.
  */
-public class KafkaWriterStateSerializerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaWriterStateSerializerTest {

Review Comment:
   ```suggestion
   class KafkaWriterStateSerializerTest {
   ```
   could be package private



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionIdFactoryTest.java:
##########
@@ -17,14 +17,16 @@
 
 package org.apache.flink.connector.kafka.sink;
 
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link TransactionalIdFactory}. */
-public class TransactionIdFactoryTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047582623


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testPartitionsForAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.partitionsFor("Topic");
+        assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testInitTransactionsAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testInitTransactionsAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.initTransactions();
+        assertThatThrownBy(kafkaProducer::initTransactions)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testBeginTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testBeginTransactionAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.initTransactions();
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.beginTransaction();
+        assertThatThrownBy(kafkaProducer::beginTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testCommitTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testCommitTransactionAfterClosed() {
         String topicName = "testCommitTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
-        kafkaProducer.commitTransaction();
+        assertThatThrownBy(kafkaProducer::commitTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testResumeTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testResumeTransactionAfterClosed() {
         String topicName = "testAbortTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
-        kafkaProducer.resumeTransaction(0L, (short) 1);
+        assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testAbortTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)

Review Comment:
   By default it takes seconds, do we still need to specify?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1348765223

   it seems that 4-th and 5-th problems are connected
   regarding 
   > 5th Comment: I might need some help on how to revert the change for that. As I cannot see anything in intellij that points to changes made in that folder
   
   you can remove a dedicated commit or give me grants to your branch, and i can do it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047274864


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1000270961


##########
flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052:
##########
@@ -26,12 +26,6 @@ org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: only
 * reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
 * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\

Review Comment:
   Is there a good reason why archunit violations should be updated?
   Otherwise i think it should be reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1003145131


##########
flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052:
##########
@@ -26,12 +26,6 @@ org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: only
 * reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
 * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\

Review Comment:
   Hi @snuyanzin 
   
   These archunit violations are getting updated automatically after I build the `flink-connector-kafka` module.
   As I have not been interacting with these files, I thought these are expected changes.
   
   Can you please advice me what might be going wrong as the this file gets updated automatically on building the module?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25538][flink-connector-kafka] JUnit5 Migration [flink]

Posted by "davidradl (via GitHub)" <gi...@apache.org>.
davidradl commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1746931693

   @snuyanzin  I see https://github.com/apache/flink/pull/22797 , this pr is no longer relevant to this repository, I suggest closing this pr and porting the change to the [new Flink connector kafka repo]( https://github.com/apache/flink-connector-kafka) if it is still relevant.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047581701


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -102,8 +106,9 @@ public void testHappyPath() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L)
-    public void testResumeTransaction() throws Exception {
+    @Test
+    @Timeout(30L)

Review Comment:
   By default it takes seconds, do we still need to specify?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047268582


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -29,25 +29,28 @@
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.time.Duration;
 import java.util.Collections;
 import java.util.Properties;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for our own {@link FlinkKafkaInternalProducer}. */
 @SuppressWarnings("serial")
-public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
+class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
     protected String transactionalId;
     protected Properties extraProperties;
     private volatile Exception exceptionInCallback;
 
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {

Review Comment:
   could be package private  i guess



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -64,7 +67,7 @@ public static void prepare() throws Exception {
                         .setKafkaServerProperties(serverProperties));
     }
 
-    @Before
+    @BeforeEach
     public void before() {

Review Comment:
   could be package private  i guess



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1003145898


##########
flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052:
##########
@@ -26,12 +26,6 @@ org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: only
 * reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
 * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\

Review Comment:
   I also tried reverting these file changes and just run `mvn test` command on this module alone and again the file gets updated automatically



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1303971800

   Hi @snuyanzin 
   
   I have made changes for the comments added.
   
   Let me know if anything else is there


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034893121


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java:
##########
@@ -30,7 +31,8 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link KafkaSinkBuilder}. */
-public class KafkaSinkBuilderTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034895784


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -143,12 +151,13 @@ public static void setupAdmin() {
         admin = AdminClient.create(properties);
     }
 
-    @AfterClass
+    @AfterAll
     public static void teardownAdmin() {

Review Comment:
   ```suggestion
       static void teardownAdmin() {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034906776


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java:
##########
@@ -45,17 +45,19 @@ public class KafkaSubscriberTest {
     private static final TopicPartition NON_EXISTING_TOPIC = new TopicPartition("removed", 0);
     private static AdminClient adminClient;
 
-    @BeforeClass
+    @BeforeAll
     public static void setup() throws Throwable {

Review Comment:
   could be package private
   ```suggestion
       static void setup() throws Throwable {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1006434223


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleExactlyOnceITCase.java:
##########
@@ -39,7 +37,7 @@
 /** Failure Recovery IT Test for KafkaShuffle. */
 public class KafkaShuffleExactlyOnceITCase extends KafkaShuffleTestBase {
 
-    @Rule public final Timeout timeout = Timeout.millis(600000L);
+    //    @RegisterExtension public final Timeout timeout = Timeout.millis(600000L);

Review Comment:
   fixed it



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java:
##########
@@ -61,7 +59,7 @@
 /** Simple End to End Test for Kafka. */
 public class KafkaShuffleITCase extends KafkaShuffleTestBase {
 
-    @Rule public final Timeout timeout = Timeout.millis(600000L);
+    //    @RegisterExtension public final Timeout timeout = Timeout.millis(600000L);

Review Comment:
   fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047275232


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromLatestOffsets() throws Exception {
         runStartFromLatestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromGroupOffsets() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromLatestOffsets() throws Exception {
         runStartFromLatestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromGroupOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromGroupOffsets() throws Exception {
         runStartFromGroupOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromSpecificOffsets() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java:
##########
@@ -79,20 +77,20 @@ public String getOperatorSnapshotPath(FlinkVersion version) {
      * Override {@link KafkaTestBase}. Kafka Migration Tests are starting up Kafka/ZooKeeper cluster
      * manually
      */
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {}
 
     /**
      * Override {@link KafkaTestBase}. Kafka Migration Tests are starting up Kafka/ZooKeeper cluster
      * manually
      */
-    @AfterClass
+    @AfterAll
     public static void shutDownServices() throws Exception {}

Review Comment:
   could be package private or protected i guess



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##########
@@ -65,16 +68,16 @@ public class KafkaShortRetentionTestBase implements Serializable {
     private static KafkaTestEnvironment kafkaServer;
     private static Properties standardProps;
 
-    @ClassRule
-    public static MiniClusterWithClientResource flink =
-            new MiniClusterWithClientResource(
+    @RegisterExtension
+    public static MiniClusterExtension flink =
+            new MiniClusterExtension(
                     new MiniClusterResourceConfiguration.Builder()
                             .setConfiguration(getConfiguration())
                             .setNumberTaskManagers(NUM_TMS)
                             .setNumberSlotsPerTaskManager(TM_SLOTS)
                             .build());
 
-    @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+    @TempDir public static Path tempFolder;

Review Comment:
   do we need it `public`?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##########
@@ -84,7 +87,7 @@ private static Configuration getConfiguration() {
         return flinkConfig;
     }
 
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {

Review Comment:
   do we need it `public`



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -92,17 +93,15 @@ public abstract class KafkaTestBase extends TestLogger {
 
     public static KafkaTestEnvironment kafkaServer;
 
-    @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+    @TempDir public static Path tempFolder;
 
     public static Properties secureProps = new Properties();
 
-    @Rule public final RetryRule retryRule = new RetryRule();
-
     // ------------------------------------------------------------------------
     //  Setup and teardown of the mini clusters
     // ------------------------------------------------------------------------
 
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {

Review Comment:
   do we need to have it `public`?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromLatestOffsets() throws Exception {
         runStartFromLatestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromGroupOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromGroupOffsets() throws Exception {
         runStartFromGroupOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromSpecificOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromSpecificOffsets() throws Exception {
         runStartFromSpecificOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromTimestamp() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromLatestOffsets() throws Exception {
         runStartFromLatestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromGroupOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromGroupOffsets() throws Exception {
         runStartFromGroupOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromSpecificOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromSpecificOffsets() throws Exception {
         runStartFromSpecificOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromTimestamp() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromTimestamp() throws Exception {
         runStartFromTimestamp();
     }
 
     // --- offset committing ---
 
-    @Test(timeout = 60000)
-    public void testCommitOffsetsToKafka() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCommitOffsetsToKafka() throws Exception {
         runCommitOffsetsToKafka();
     }
 
-    @Test(timeout = 60000)
-    public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
         runAutoOffsetRetrievalAndCommitToKafka();
     }
 
-    @Test(timeout = 60000)
-    public void testCollectingSchema() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -92,17 +93,15 @@ public abstract class KafkaTestBase extends TestLogger {
 
     public static KafkaTestEnvironment kafkaServer;
 
-    @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+    @TempDir public static Path tempFolder;

Review Comment:
   do we need to have it `public`?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java:
##########
@@ -79,20 +77,20 @@ public String getOperatorSnapshotPath(FlinkVersion version) {
      * Override {@link KafkaTestBase}. Kafka Migration Tests are starting up Kafka/ZooKeeper cluster
      * manually
      */
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {}

Review Comment:
   could be package private or protected i guess



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromLatestOffsets() throws Exception {
         runStartFromLatestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromGroupOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromGroupOffsets() throws Exception {
         runStartFromGroupOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromSpecificOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromSpecificOffsets() throws Exception {
         runStartFromSpecificOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromTimestamp() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromTimestamp() throws Exception {
         runStartFromTimestamp();
     }
 
     // --- offset committing ---
 
-    @Test(timeout = 60000)
-    public void testCommitOffsetsToKafka() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCommitOffsetsToKafka() throws Exception {
         runCommitOffsetsToKafka();
     }
 
-    @Test(timeout = 60000)
-    public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
         runAutoOffsetRetrievalAndCommitToKafka();
     }
 
-    @Test(timeout = 60000)
-    public void testCollectingSchema() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCollectingSchema() throws Exception {
         runCollectingSchemaTest();
     }
 
     /** Kafka 20 specific test, ensuring Timestamps are properly written to and read from Kafka. */
-    @Test(timeout = 60000)
-    public void testTimestamps() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromLatestOffsets() throws Exception {
         runStartFromLatestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromGroupOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromGroupOffsets() throws Exception {
         runStartFromGroupOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromSpecificOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromSpecificOffsets() throws Exception {
         runStartFromSpecificOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromTimestamp() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromTimestamp() throws Exception {
         runStartFromTimestamp();
     }
 
     // --- offset committing ---
 
-    @Test(timeout = 60000)
-    public void testCommitOffsetsToKafka() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -79,7 +79,8 @@
  */
 @SuppressWarnings("serial")
 @RetryOnFailure(times = 3)

Review Comment:
   yes it makes sense
   it will simplify to remove junit4 related stuff



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java:
##########
@@ -18,13 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.BeforeClass;
+import org.junit.jupiter.api.BeforeAll;
 
 /** IT cases for the {@link FlinkKafkaProducer}. */
 @SuppressWarnings("serial")
 public class KafkaProducerAtLeastOnceITCase extends KafkaProducerTestBase {
 
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {

Review Comment:
   could be package private



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##########
@@ -113,11 +116,14 @@ public static void prepare() throws Exception {
         standardProps = kafkaServer.getStandardProperties();
     }
 
-    @AfterClass
-    public static void shutDownServices() throws Exception {
+    @AfterAll
+    public static void shutDownServices(@InjectMiniCluster MiniCluster miniCluster)
+            throws Exception {
         kafkaServer.shutdown();
 
         secureProps.clear();
+
+        miniCluster.close();

Review Comment:
   why should we start to close something if we didn't open it?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBigRecordJob() throws Exception {
         runBigRecordTestTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithLegacySerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithLegacySerializer() throws Exception {
         runProduceConsumeMultipleTopics(true);
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleTopicsWithKafkaSerializer() throws Exception {
         runProduceConsumeMultipleTopics(false);
     }
 
-    @Test(timeout = 60000)
-    public void testAllDeletes() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testAllDeletes() throws Exception {
         runAllDeletesTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMetricsAndEndOfStream() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMetricsAndEndOfStream() throws Exception {
         runEndOfStreamTest();
     }
 
     // --- startup mode ---
 
-    @Test(timeout = 60000)
-    public void testStartFromEarliestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromEarliestOffsets() throws Exception {
         runStartFromEarliestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromLatestOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromLatestOffsets() throws Exception {
         runStartFromLatestOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromGroupOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromGroupOffsets() throws Exception {
         runStartFromGroupOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromSpecificOffsets() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromSpecificOffsets() throws Exception {
         runStartFromSpecificOffsets();
     }
 
-    @Test(timeout = 60000)
-    public void testStartFromTimestamp() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testStartFromTimestamp() throws Exception {
         runStartFromTimestamp();
     }
 
     // --- offset committing ---
 
-    @Test(timeout = 60000)
-    public void testCommitOffsetsToKafka() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCommitOffsetsToKafka() throws Exception {
         runCommitOffsetsToKafka();
     }
 
-    @Test(timeout = 60000)
-    public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBaseWithFlink.java:
##########
@@ -30,9 +30,9 @@ public abstract class KafkaTestBaseWithFlink extends KafkaTestBase {
 
     protected static final int TM_SLOTS = 8;
 
-    @ClassRule
-    public static MiniClusterWithClientResource flink =
-            new MiniClusterWithClientResource(
+    @RegisterExtension
+    public static final MiniClusterExtension FLINK =

Review Comment:
   do we need to have it `public`?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##########
@@ -113,11 +116,14 @@ public static void prepare() throws Exception {
         standardProps = kafkaServer.getStandardProperties();
     }
 
-    @AfterClass
-    public static void shutDownServices() throws Exception {
+    @AfterAll
+    public static void shutDownServices(@InjectMiniCluster MiniCluster miniCluster)

Review Comment:
   do we need it `public`?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -111,7 +110,7 @@ public static void prepare() throws Exception {
         startClusters(false);
     }
 
-    @AfterClass
+    @AfterAll
     public static void shutDownServices() throws Exception {

Review Comment:
   do we need to have it `public`?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java:
##########
@@ -128,15 +126,19 @@ private OperatorSubtaskState initializeTestState() throws Exception {
     }
 
     @SuppressWarnings("warning")
-    @Test
+    @TestTemplate
     public void testRestoreProducer() throws Exception {

Review Comment:
   could be package private 



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java:
##########
@@ -18,21 +18,21 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 /** IT cases for the {@link FlinkKafkaProducer}. */
 @SuppressWarnings("serial")
-public class KafkaProducerExactlyOnceITCase extends KafkaProducerTestBase {
-    @BeforeClass
+class KafkaProducerExactlyOnceITCase extends KafkaProducerTestBase {
+    @BeforeAll
     public static void prepare() throws Exception {

Review Comment:
   could be package private



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
     //  Suite of Tests
     // ------------------------------------------------------------------------
 
-    @Test(timeout = 120000)
-    public void testFailOnNoBroker() throws Exception {
+    @Test
+    @Timeout(120L)
+    void testFailOnNoBroker() throws Exception {
         runFailOnNoBrokerTest();
     }
 
-    @Test(timeout = 60000)
-    public void testConcurrentProducerConsumerTopology() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testConcurrentProducerConsumerTopology() throws Exception {
         runSimpleConcurrentProducerConsumerTopology();
     }
 
-    @Test(timeout = 60000)
-    public void testKeyValueSupport() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testKeyValueSupport() throws Exception {
         runKeyValueTest();
     }
 
     // --- canceling / failures ---
 
-    @Test(timeout = 60000)
-    public void testCancelingEmptyTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingEmptyTopic() throws Exception {
         runCancelingOnEmptyInputTest();
     }
 
-    @Test(timeout = 60000)
-    public void testCancelingFullTopic() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testCancelingFullTopic() throws Exception {
         runCancelingOnFullInputTest();
     }
 
     // --- source to partition mappings and exactly once ---
 
-    @Test(timeout = 60000)
-    public void testOneToOneSources() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneToOneSources() throws Exception {
         runOneToOneExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testOneSourceMultiplePartitions() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testOneSourceMultiplePartitions() throws Exception {
         runOneSourceMultiplePartitionsExactlyOnceTest();
     }
 
-    @Test(timeout = 60000)
-    public void testMultipleSourcesOnePartition() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testMultipleSourcesOnePartition() throws Exception {
         runMultipleSourcesOnePartitionExactlyOnceTest();
     }
 
     // --- broker failure ---
 
-    @Test(timeout = 60000)
-    public void testBrokerFailure() throws Exception {
+    @Test
+    @Timeout(60L)
+    void testBrokerFailure() throws Exception {
         runBrokerFailureTest();
     }
 
     // --- special executions ---
 
-    @Test(timeout = 60000)
-    public void testBigRecordJob() throws Exception {
+    @Test
+    @Timeout(60L)

Review Comment:
   time unit should be specified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1003145898


##########
flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052:
##########
@@ -26,12 +26,6 @@ org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: only
 * reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
 * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\

Review Comment:
   I also tried reverting these file changes and just run `mvn test` command on this module alone and again the file gets updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1318656114

   As I see there are some conflicts could you please resolve them?
   Also it seems build failed because of `spotless` issues could you please run `mvn spotless:apply` before commit. It will do autoformatting
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034892886


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java:
##########
@@ -45,15 +46,16 @@
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link KafkaRecordSerializationSchemaBuilder}. */
-public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaRecordSerializationSchemaBuilderTest {
 
     private static final String DEFAULT_TOPIC = "test";
 
     private static Map<String, ?> configurableConfiguration;
     private static Map<String, ?> configuration;
     private static boolean isKeySerializer;
 
-    @Before
+    @BeforeEach
     public void setUp() {

Review Comment:
   ```suggestion
       void setUp() {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1342859067

   hi @ashmeet-kandhari do you have any questions/concerns/whatever?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by "ashmeet-kandhari (via GitHub)" <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1594034092

   Hi @Sergey,
   
   I am on vacation and will be back in July.
   
   I can continue after that.
   
   Thanks
   Ashmeet
   
   On Tue, Jun 6, 2023, 13:11 Sergey Nuyanzin ***@***.***> wrote:
   
   > Hi @ashmeet-kandhari <https://github.com/ashmeet-kandhari> I wonder
   > whether you going to continue working on this PR or not?
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/flink/pull/20991#issuecomment-1578500753>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AI7OGHGWRFOJOPZSGQCUWQDXJ4F6HANCNFSM6AAAAAARALJTVI>
   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034894549


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -108,7 +113,9 @@
 import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for using KafkaSink writing to a Kafka cluster. */
-public class KafkaSinkITCase extends TestLogger {
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034899201


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionIdFactoryTest.java:
##########
@@ -17,14 +17,16 @@
 
 package org.apache.flink.connector.kafka.sink;
 
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link TransactionalIdFactory}. */
-public class TransactionIdFactoryTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class TransactionIdFactoryTest {

Review Comment:
   ```suggestion
   class TransactionIdFactoryTest {
   ```
   could be package private



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionToAbortCheckerTest.java:
##########
@@ -17,19 +17,21 @@
 
 package org.apache.flink.connector.kafka.sink;
 
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link TransactionsToAbortChecker}. */
-public class TransactionToAbortCheckerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034897483


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java:
##########
@@ -49,19 +51,21 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link KafkaTransactionLog} to retrieve abortable Kafka transactions. */
-public class KafkaTransactionLogITCase extends TestLogger {
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaTransactionLogITCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkITCase.class);
     private static final String TOPIC_NAME = "kafkaTransactionLogTest";
     private static final String TRANSACTIONAL_ID_PREFIX = "kafka-log";
 
-    @ClassRule
+    @Container
     public static final KafkaContainer KAFKA_CONTAINER =
             createKafkaContainer(KAFKA, LOG).withEmbeddedZookeeper();
 
     private final List<Producer<byte[], Integer>> openProducers = new ArrayList<>();
 
-    @After
+    @AfterEach
     public void tearDown() {

Review Comment:
   ```suggestion
       void tearDown() {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034904171


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -166,7 +167,8 @@ public void testReaderRegistrationTriggersAssignments() throws Throwable {
         }
     }
 
-    @Test(timeout = 30000L)
+    @Test
+    @Timeout(30000L)

Review Comment:
   In junit5 default unit is seconds while in junit4 is millis.
   Do we really need to set timeout for 30000 secs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047264081


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -428,13 +434,13 @@ private void createTestTopic(String topic, int numPartitions, short replicationF
                 admin.createTopics(
                         Collections.singletonList(
                                 new NewTopic(topic, numPartitions, replicationFactor)));
-        result.all().get();
+        result.all().get(1, TimeUnit.MINUTES);

Review Comment:
   why do we need this change?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -428,13 +434,13 @@ private void createTestTopic(String topic, int numPartitions, short replicationF
                 admin.createTopics(
                         Collections.singletonList(
                                 new NewTopic(topic, numPartitions, replicationFactor)));
-        result.all().get();
+        result.all().get(1, TimeUnit.MINUTES);
     }
 
     private void deleteTestTopic(String topic)
             throws ExecutionException, InterruptedException, TimeoutException {
         final DeleteTopicsResult result = admin.deleteTopics(Collections.singletonList(topic));
-        result.all().get();
+        result.all().get(1, TimeUnit.MINUTES);

Review Comment:
   why do we need this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1323914415

   Hi @snuyanzin 
   
   I tried resolving the conflicts (haven't pushed yet), but when I try to build locally getting the following error
   
   ```
    Too many files with unapproved license: 14 See RAT report in: D:\Projects\
   Intellij\git\flink\target\rat.txt 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1323924577

   what are the changes there?
   from my point of view they should not be changed and should not be a part of this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1291980945

   I noticed that classes with `@Parametrized` annotation are not migrated well....
   I would suggest to in more incremental approach:
   The initial state e.g. master should be pass all the tests
   then just convert one test check it passes and commit and so on (in case of test hierarchy it could require more tests to change) however anyway this approach will simplify detection of problem tests and possible issues


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1005571712


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -79,7 +79,8 @@
  */
 @SuppressWarnings("serial")
 @RetryOnFailure(times = 3)

Review Comment:
   In javadoc[1] for `RetryOnFailure` is stated that 
   > \* <p>Add the {@link RetryRule} to your test class and annotate the class and/or tests with {@link
   > \* RetryOnFailure}.
   
   which means it makes sense to use it only with junit4. With junit5 there should be used `RetryOnException`
   
   [1] https://github.com/apache/flink/blob/f8c6a668cd2b887f33a0cf4608de2d6b95c71f03/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryOnFailure.java#L30
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1005571712


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -79,7 +79,8 @@
  */
 @SuppressWarnings("serial")
 @RetryOnFailure(times = 3)

Review Comment:
   In javadoc[1] for `RetryOnFailure` is stated that 
   > \* <p>Add the {@link RetryRule} to your test class and annotate the class and/or tests with {@ link
   > \* RetryOnFailure}.
   
   which means it makes sense to use it only with junit4. With junit5 there should be used `RetryOnException`
   
   [1] https://github.com/apache/flink/blob/f8c6a668cd2b887f33a0cf4608de2d6b95c71f03/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryOnFailure.java#L30
   



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -79,7 +79,8 @@
  */
 @SuppressWarnings("serial")
 @RetryOnFailure(times = 3)

Review Comment:
   In javadoc[1] for `RetryOnFailure` is stated that 
   > \* <p>Add the {@ link RetryRule} to your test class and annotate the class and/or tests with {@ link
   > \* RetryOnFailure}.
   
   which means it makes sense to use it only with junit4. With junit5 there should be used `RetryOnException`
   
   [1] https://github.com/apache/flink/blob/f8c6a668cd2b887f33a0cf4608de2d6b95c71f03/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryOnFailure.java#L30
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034896169


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -143,12 +151,13 @@ public static void setupAdmin() {
         admin = AdminClient.create(properties);
     }
 
-    @AfterClass
+    @AfterAll
     public static void teardownAdmin() {
         admin.close();
+        KAFKA_CONTAINER.close();
     }
 
-    @Before
+    @BeforeEach
     public void setUp() throws ExecutionException, InterruptedException, TimeoutException {

Review Comment:
   ```suggestion
       void setUp() throws ExecutionException, InterruptedException, TimeoutException {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034894913


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -108,7 +113,9 @@
 import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for using KafkaSink writing to a Kafka cluster. */
-public class KafkaSinkITCase extends TestLogger {
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaSinkITCase {

Review Comment:
   ```suggestion
   class KafkaSinkITCase {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034893527


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java:
##########
@@ -30,7 +31,8 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link KafkaSinkBuilder}. */
-public class KafkaSinkBuilderTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaSinkBuilderTest {

Review Comment:
   ```suggestion
   class KafkaSinkBuilderTest {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034907770


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java:
##########
@@ -53,7 +53,7 @@ public class KafkaRecordDeserializationSchemaTest {
     private static Map<String, ?> configuration;
     private static boolean isKeyDeserializer;
 
-    @Before
+    @BeforeEach
     public void setUp() {

Review Comment:
   ```suggestion
       void setUp() {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1325605146

   Hi @snuyanzin,
   
   I have resolved the conflicts. Please review it, whenever you can


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1277859592

   By the way to avoid could you please try this command on your branch 
   ```
   mvn -DskipTests -Dfast-Pskip-webui-build -T1C clean install
   ```
   on your branch checkout to rule out that it is caused by some mismatch of SNAPSHOT versions of Flink modules


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1323920168

   Oh not sure why are they here, but this is what I found from the rat.txt logs
   
   ```
   14 Unknown Licenses
   
   *****************************************************
   
   Files with unapproved licenses:
   
     flink-connectors/flink-connector-files/src/test/resources/bucket-state-migration-test/empty-v1/snapshot
     flink-connectors/flink-connector-files/src/test/resources/bucket-state-migration-test/empty-v2/snapshot
     flink-connectors/flink-connector-files/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot
     flink-connectors/flink-connector-files/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot
     flink-connectors/flink-connector-files/src/test/resources/committable-serializer-migration/in-progress-v1/committable
     flink-connectors/flink-connector-files/src/test/resources/committable-serializer-migration/pending-v1/committable
     flink-connectors/flink-file-sink-common/src/test/resources/recoverable-serializer-migration/in-progress-v1/recoverable
     flink-connectors/flink-file-sink-common/src/test/resources/recoverable-serializer-migration/pending-v1/recoverable
     flink-core/src/test/resources/abstractID-with-toString-field
     flink-core/src/test/resources/abstractID-with-toString-field-set
     flink-streaming-java/src/test/resources/bucket-state-migration-test/empty-v1/snapshot
     flink-streaming-java/src/test/resources/bucket-state-migration-test/empty-v2/snapshot
     flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot
     flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047581939


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)

Review Comment:
   By default it takes seconds, do we still need to specify?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testPartitionsForAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.partitionsFor("Topic");
+        assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testInitTransactionsAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testInitTransactionsAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.initTransactions();
+        assertThatThrownBy(kafkaProducer::initTransactions)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testBeginTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)

Review Comment:
   By default it takes seconds, do we still need to specify?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testPartitionsForAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.partitionsFor("Topic");
+        assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testInitTransactionsAfterClosed() {
+    @Test
+    @Timeout(30L)

Review Comment:
   By default it takes seconds, do we still need to specify?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047270252


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -102,8 +106,9 @@ public void testHappyPath() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L)
-    public void testResumeTransaction() throws Exception {
+    @Test
+    @Timeout(30L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testPartitionsForAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.partitionsFor("Topic");
+        assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testInitTransactionsAfterClosed() {
+    @Test
+    @Timeout(30L)

Review Comment:
   time unit should be specified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047266160


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceLegacyITCase.java:
##########
@@ -37,7 +37,7 @@ public KafkaSourceLegacyITCase() throws Exception {
         super(true);
     }
 
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {

Review Comment:
   ok let's change it to protected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047271538


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testPartitionsForAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.partitionsFor("Topic");
+        assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testInitTransactionsAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testInitTransactionsAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.initTransactions();
+        assertThatThrownBy(kafkaProducer::initTransactions)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testBeginTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testBeginTransactionAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.initTransactions();
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.beginTransaction();
+        assertThatThrownBy(kafkaProducer::beginTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testCommitTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testCommitTransactionAfterClosed() {
         String topicName = "testCommitTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
-        kafkaProducer.commitTransaction();
+        assertThatThrownBy(kafkaProducer::commitTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testResumeTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testResumeTransactionAfterClosed() {
         String topicName = "testAbortTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
-        kafkaProducer.resumeTransaction(0L, (short) 1);
+        assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testAbortTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    // TODO verify why 'The producer 951754235 has already been closed' is coming
+    @Disabled

Review Comment:
   why is it disabled?



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testPartitionsForAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.partitionsFor("Topic");
+        assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testInitTransactionsAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testInitTransactionsAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.initTransactions();
+        assertThatThrownBy(kafkaProducer::initTransactions)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testBeginTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testBeginTransactionAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.initTransactions();
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.beginTransaction();
+        assertThatThrownBy(kafkaProducer::beginTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testCommitTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testCommitTransactionAfterClosed() {
         String topicName = "testCommitTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
-        kafkaProducer.commitTransaction();
+        assertThatThrownBy(kafkaProducer::commitTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testResumeTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)

Review Comment:
   time unit should be specified



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testPartitionsForAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.partitionsFor("Topic");
+        assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testInitTransactionsAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testInitTransactionsAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.initTransactions();
+        assertThatThrownBy(kafkaProducer::initTransactions)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testBeginTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testBeginTransactionAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.initTransactions();
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.beginTransaction();
+        assertThatThrownBy(kafkaProducer::beginTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testCommitTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testCommitTransactionAfterClosed() {
         String topicName = "testCommitTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
-        kafkaProducer.commitTransaction();
+        assertThatThrownBy(kafkaProducer::commitTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testResumeTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testResumeTransactionAfterClosed() {
         String topicName = "testAbortTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
-        kafkaProducer.resumeTransaction(0L, (short) 1);
+        assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testAbortTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    // TODO verify why 'The producer 951754235 has already been closed' is coming
+    @Disabled
+    void testAbortTransactionAfterClosed() {
         String topicName = "testAbortTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
         kafkaProducer.abortTransaction();
-        kafkaProducer.resumeTransaction(0L, (short) 1);
+        assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testFlushAfterClosed() {
+    @Test
+    @Timeout(30L)

Review Comment:
   time unit should be specified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034901013


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -72,14 +73,14 @@ public class KafkaEnumeratorTest {
     private static final boolean INCLUDE_DYNAMIC_TOPIC = true;
     private static final boolean EXCLUDE_DYNAMIC_TOPIC = false;
 
-    @BeforeClass
+    @BeforeAll
     public static void setup() throws Throwable {

Review Comment:
   ```suggestion
       static void setup() throws Throwable {
   ```
   could be package private



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -72,14 +73,14 @@ public class KafkaEnumeratorTest {
     private static final boolean INCLUDE_DYNAMIC_TOPIC = true;
     private static final boolean EXCLUDE_DYNAMIC_TOPIC = false;
 
-    @BeforeClass
+    @BeforeAll
     public static void setup() throws Throwable {
         KafkaSourceTestEnv.setup();
         KafkaSourceTestEnv.setupTopic(TOPIC1, true, true, KafkaSourceTestEnv::getRecordsForTopic);
         KafkaSourceTestEnv.setupTopic(TOPIC2, true, true, KafkaSourceTestEnv::getRecordsForTopic);
     }
 
-    @AfterClass
+    @AfterAll
     public static void tearDown() throws Exception {

Review Comment:
   ```suggestion
       static void tearDown() throws Exception {
   ```
   package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034899740


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionToAbortCheckerTest.java:
##########
@@ -17,19 +17,21 @@
 
 package org.apache.flink.connector.kafka.sink;
 
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link TransactionsToAbortChecker}. */
-public class TransactionToAbortCheckerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class TransactionToAbortCheckerTest {

Review Comment:
   ```suggestion
   class TransactionToAbortCheckerTest {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034907187


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java:
##########
@@ -45,17 +45,19 @@ public class KafkaSubscriberTest {
     private static final TopicPartition NON_EXISTING_TOPIC = new TopicPartition("removed", 0);
     private static AdminClient adminClient;
 
-    @BeforeClass
+    @BeforeAll
     public static void setup() throws Throwable {
         KafkaSourceTestEnv.setup();
         KafkaSourceTestEnv.createTestTopic(TOPIC1);
         KafkaSourceTestEnv.createTestTopic(TOPIC2);
         adminClient = KafkaSourceTestEnv.getAdminClient();
     }
 
-    @AfterClass
+    @AfterAll
     public static void tearDown() throws Exception {

Review Comment:
   ```suggestion
       static void tearDown() throws Exception {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034891596


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommittableSerializerTest.java:
##########
@@ -29,7 +30,8 @@
  * Tests for serializing and deserialzing {@link KafkaCommittable} with {@link
  * KafkaCommittableSerializer}.
  */
-public class KafkaCommittableSerializerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034904521


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -166,7 +167,8 @@ public void testReaderRegistrationTriggersAssignments() throws Throwable {
         }
     }
 
-    @Test(timeout = 30000L)
+    @Test
+    @Timeout(30000L)
     public void testDiscoverPartitionsPeriodically() throws Throwable {

Review Comment:
   ```suggestion
       void testDiscoverPartitionsPeriodically() throws Throwable {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034906144


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -145,10 +148,14 @@ public void testSpecificOffsetsInitializer() {
         }
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test
     public void testSpecifiedOffsetsInitializerWithoutOffsetResetStrategy() {

Review Comment:
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1275766548

   Thanks for your contribution @ashmeet-kandhari 
   As I mentioned in jira issue we could continue code related discussions here
   
   Could you please share more detail/link to ci with failure you've mentioned?
   also about changes: i think changes of `docs/themes/book` are not necessary for this PR and could be removed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1323964883

   or just cherry-picked commits from this PR to rebased version


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1578500753

   Hi @ashmeet-kandhari I wonder whether you going to continue working on this PR or not?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1310983666

   Hi @snuyanzin 
   Can you re-check this PR once
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1005622582


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java:
##########
@@ -61,7 +59,7 @@
 /** Simple End to End Test for Kafka. */
 public class KafkaShuffleITCase extends KafkaShuffleTestBase {
 
-    @Rule public final Timeout timeout = Timeout.millis(600000L);
+    //    @RegisterExtension public final Timeout timeout = Timeout.millis(600000L);

Review Comment:
   Same here timeout should be migrated as well to junit5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1044911351


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceLegacyITCase.java:
##########
@@ -37,7 +37,7 @@ public KafkaSourceLegacyITCase() throws Exception {
         super(true);
     }
 
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {

Review Comment:
   this method calls  KafkaProducerTestBase.prepare();
   Which is needs to be protected method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047583788


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testPartitionsForAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.partitionsFor("Topic");
+        assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testInitTransactionsAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testInitTransactionsAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.initTransactions();
+        assertThatThrownBy(kafkaProducer::initTransactions)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testBeginTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testBeginTransactionAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.initTransactions();
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.beginTransaction();
+        assertThatThrownBy(kafkaProducer::beginTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testCommitTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testCommitTransactionAfterClosed() {
         String topicName = "testCommitTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
-        kafkaProducer.commitTransaction();
+        assertThatThrownBy(kafkaProducer::commitTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testResumeTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testResumeTransactionAfterClosed() {
         String topicName = "testAbortTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
-        kafkaProducer.resumeTransaction(0L, (short) 1);
+        assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testAbortTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    // TODO verify why 'The producer 951754235 has already been closed' is coming
+    @Disabled

Review Comment:
   While running this test I was getting the error as mentioned in the TODO, so disabled it for now to discuss about it



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testPartitionsForAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.partitionsFor("Topic");
+        assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testInitTransactionsAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testInitTransactionsAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.initTransactions();
+        assertThatThrownBy(kafkaProducer::initTransactions)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testBeginTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testBeginTransactionAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.initTransactions();
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.beginTransaction();
+        assertThatThrownBy(kafkaProducer::beginTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testCommitTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testCommitTransactionAfterClosed() {
         String topicName = "testCommitTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
-        kafkaProducer.commitTransaction();
+        assertThatThrownBy(kafkaProducer::commitTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testResumeTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testResumeTransactionAfterClosed() {
         String topicName = "testAbortTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
-        kafkaProducer.resumeTransaction(0L, (short) 1);
+        assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testAbortTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    // TODO verify why 'The producer 951754235 has already been closed' is coming
+    @Disabled
+    void testAbortTransactionAfterClosed() {
         String topicName = "testAbortTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
         kafkaProducer.abortTransaction();
-        kafkaProducer.resumeTransaction(0L, (short) 1);
+        assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testFlushAfterClosed() {
+    @Test
+    @Timeout(30L)

Review Comment:
   By default it takes seconds, do we still need to specify?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1330839374

   hi @ashmeet-kandhari 
   thanks for conflicts resolution
    I left some comments (i didn't go through the whole changeset)
   however there are several types of things which would be nice to address
   1. Since junit5 classes and tests method could be package private (may be except some cases with hierarchies). So would nice to have it fixed
   2. Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here than using annotation `@ExtendWith(TestLoggerExtension.class)` for every class.
   3. in junit5 timeout unit by default is seconds while in junit4 it is millis. It looks like this should be taken into account or may be better set it explicitly like `@Timeout(value = 30_000, unit = TimeUnit.MILLISECONDS)` 
   4. I noticed there are tests failing would be nice to understand if it is result of changes within this PR or something else
   5. GitHub still shows changes for `docs/themes/book` in this PR. It should be reverted since it has nothing to do with kafka-connector to junit5 migration


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034900090


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceLegacyITCase.java:
##########
@@ -37,7 +37,7 @@ public KafkaSourceLegacyITCase() throws Exception {
         super(true);
     }
 
-    @BeforeClass
+    @BeforeAll
     public static void prepare() throws Exception {

Review Comment:
   ```suggestion
       static void prepare() throws Exception {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034896581


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -158,7 +167,7 @@ public void setUp() throws ExecutionException, InterruptedException, TimeoutExce
         createTestTopic(topic, 1, TOPIC_REPLICATION_FACTOR);
     }
 
-    @After
+    @AfterEach
     public void tearDown() throws ExecutionException, InterruptedException, TimeoutException {

Review Comment:
   ```suggestion
       void tearDown() throws ExecutionException, InterruptedException, TimeoutException {
   ```
   could be package private



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java:
##########
@@ -49,19 +51,21 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link KafkaTransactionLog} to retrieve abortable Kafka transactions. */
-public class KafkaTransactionLogITCase extends TestLogger {
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034897981


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##########
@@ -77,6 +77,7 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the standalone KafkaWriter. */
+@Testcontainers
 @ExtendWith(TestLoggerExtension.class)
 public class KafkaWriterITCase {

Review Comment:
   ```suggestion
   class KafkaWriterITCase {
   ```
   could be package private



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializerTest.java:
##########
@@ -29,7 +30,8 @@
  * Tests for serializing and deserialzing {@link KafkaWriterState} with {@link
  * KafkaWriterStateSerializer}.
  */
-public class KafkaWriterStateSerializerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034897140


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java:
##########
@@ -49,19 +51,21 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link KafkaTransactionLog} to retrieve abortable Kafka transactions. */
-public class KafkaTransactionLogITCase extends TestLogger {
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaTransactionLogITCase {

Review Comment:
   ```suggestion
   class KafkaTransactionLogITCase {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034892520


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java:
##########
@@ -45,15 +46,16 @@
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link KafkaRecordSerializationSchemaBuilder}. */
-public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaRecordSerializationSchemaBuilderTest {

Review Comment:
   ```suggestion
   class KafkaRecordSerializationSchemaBuilderTest {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1323917212

   it means that there are some new files in your PR without license...
   I guess there should be no any new files within this PR.
   A list of new files you can find in already mentioned `D:\Projects\Intellij\git\flink\target\rat.txt `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034900636


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceLegacyITCase.java:
##########
@@ -91,7 +91,7 @@ public void testMultipleSourcesOnePartition() throws Exception {
     // --- broker failure ---
 
     @Test
-    @Ignore("FLINK-28267")
+    @Disabled("FLINK-28267")
     public void testBrokerFailure() throws Exception {

Review Comment:
   ```suggestion
       void testBrokerFailure() throws Exception {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034905020


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -25,24 +25,25 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Unit tests for {@link OffsetsInitializer}. */
 public class OffsetsInitializerTest {

Review Comment:
   ```suggestion
   class OffsetsInitializerTest {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1323938824

   Doesn't show any diff,
   I tried checking the history as well and shows nothing
   ![image](https://user-images.githubusercontent.com/37675804/203368442-7e42d3cb-8e2c-4a49-a195-aec35b66da7c.png)
   
   
   Maybe I will need to cancel merge and try again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1272375576

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ff2e4a8ff3a6444267e2a2c9f03a4e3a057e9619",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ff2e4a8ff3a6444267e2a2c9f03a4e3a057e9619",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ff2e4a8ff3a6444267e2a2c9f03a4e3a057e9619 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1354073289

   Hi @snuyanzin ,
   
   I have given you access to my repo.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047271926


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testPartitionsForAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.partitionsFor("Topic");
+        assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testInitTransactionsAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testInitTransactionsAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.initTransactions();
+        assertThatThrownBy(kafkaProducer::initTransactions)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testBeginTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testBeginTransactionAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.initTransactions();
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.beginTransaction();
+        assertThatThrownBy(kafkaProducer::beginTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testCommitTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testCommitTransactionAfterClosed() {
         String topicName = "testCommitTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
-        kafkaProducer.commitTransaction();
+        assertThatThrownBy(kafkaProducer::commitTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testResumeTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testResumeTransactionAfterClosed() {
         String topicName = "testAbortTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
-        kafkaProducer.resumeTransaction(0L, (short) 1);
+        assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testAbortTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    // TODO verify why 'The producer 951754235 has already been closed' is coming
+    @Disabled
+    void testAbortTransactionAfterClosed() {
         String topicName = "testAbortTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
         kafkaProducer.abortTransaction();
-        kafkaProducer.resumeTransaction(0L, (short) 1);
+        assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testFlushAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testFlushAfterClosed() {
         String topicName = "testCommitTransactionAfterClosed";
         FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
-        kafkaProducer.flush();
+        assertThatThrownBy(kafkaProducer::flush).isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L)
-    public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
+    @Test
+    @Timeout(30L)

Review Comment:
   time unit should be specified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047272871


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java:
##########
@@ -68,7 +68,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
     protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema =
             new KeyedSerializationSchemaWrapper<>(integerSerializationSchema);
 
-    @Before
+    @BeforeEach
     public void before() {

Review Comment:
   could be package private i guess



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java:
##########
@@ -171,7 +171,7 @@ public void testRestoreToCheckpointAfterExceedingProducersPool() throws Exceptio
 
     /** This test hangs when running it in your IDE. */
     @Test
-    @Ignore
+    @Disabled
     public void testFlinkKafkaProducerFailBeforeNotify() throws Exception {

Review Comment:
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1345624195

   
   
   
   > hi @ashmeet-kandhari thanks for conflicts resolution I left some comments (i didn't go through the whole changeset) however there are several types of things which would be nice to address
   > 
   > 1. Since junit5 classes and tests method could be package private (may be except some cases with hierarchies). So would nice to have it fixed
   > 2. Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here than using annotation `@ExtendWith(TestLoggerExtension.class)` for every class.
   > 3. in junit5 timeout unit by default is seconds while in junit4 it is millis. It looks like this should be taken into account or may be better set it explicitly like `@Timeout(value = 30_000, unit = TimeUnit.MILLISECONDS)`
   > 4. I noticed there are tests failing would be nice to understand if it is result of changes within this PR or something else
   > 5. GitHub still shows changes for `docs/themes/book` in this PR. It should be reverted since it has nothing to do with kafka-connector to junit5 migration
   
   Hi @snuyanzin 
   
   I have addressed the first 3 points  from the above comments.
   
   Regarding the rest of the comments
    * _4th Comment:_ The from what I could see the tests failed for 2 reasons
       1. Compilation failed and suggestion was to run `mvn spotless:apply`. But I always run it before committing it. I did test now with the new changes made locally and the tests run fine. So we can see if this comes up again
       2. `docs_404_checks` stage failed with the error mentioned below. Not sure what this means as I haven't changed anything there 
      > Error: Error building site: "/home/vsts/work/1/s/docs/content.zh/_index.md:36:1": failed to extract shortcode: template for shortcode "columns" not found
   
   * _5th Comment:_ I might need some help on how to revert the change for that. As I cannot see anything in intellij that points to changes made in that folder
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-25538][flink-connector-kafka] JUnit5 Migration [flink]

Posted by "MartijnVisser (via GitHub)" <gi...@apache.org>.
MartijnVisser closed pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
URL: https://github.com/apache/flink/pull/20991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1006434961


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java:
##########
@@ -38,16 +39,16 @@
  * <p>For regenerating the binary snapshot files run {@link #writeSnapshot()} on the corresponding
  * Flink release-* branch.
  */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class FlinkKafkaProducerMigrationTest extends KafkaMigrationTestBase {
-    @Parameterized.Parameters(name = "Migration Savepoint: {0}")
+    @Parameters(name = "Migration Savepoint: {0}")
     public static Collection<FlinkVersion> parameters() {
         return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_15);
     }
 
-    public FlinkKafkaProducerMigrationTest(FlinkVersion testMigrateVersion) {
-        super(testMigrateVersion);
-    }
+    //    public FlinkKafkaProducerMigrationTest(FlinkVersion testMigrateVersion) {
+    //        super(testMigrateVersion);
+    //    }

Review Comment:
   I wanted to check before I removed it.
   Do you think we still need the constructor after migration to junit 5 or can we remove it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1006431695


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -79,7 +79,8 @@
  */
 @SuppressWarnings("serial")
 @RetryOnFailure(times = 3)

Review Comment:
   Hi @snuyanzin 
   
   After going through the `RetryOnException` java docs it also mentions the same thing
   
   > Annotation to use with {@link org.apache.flink.testutils.junit.RetryRule}.
   > 
   > <p>Add the {@link org.apache.flink.testutils.junit.RetryRule} to your test class and annotate the
   >  class and/or tests with {@link RetryOnException}.
   
   I also went through the `RetryExtension` source code and looks like it reads from both the annotations (RetryOnException, RetryOnFailure)
   
   Do you still think it makes sense to migrate to `RetryOnException` annotation?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1005612119


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleExactlyOnceITCase.java:
##########
@@ -39,7 +37,7 @@
 /** Failure Recovery IT Test for KafkaShuffle. */
 public class KafkaShuffleExactlyOnceITCase extends KafkaShuffleTestBase {
 
-    @Rule public final Timeout timeout = Timeout.millis(600000L);
+    //    @RegisterExtension public final Timeout timeout = Timeout.millis(600000L);

Review Comment:
   Well to have same behavior for junit5 timeout should be propagated to all the tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1005624592


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java:
##########
@@ -38,16 +39,16 @@
  * <p>For regenerating the binary snapshot files run {@link #writeSnapshot()} on the corresponding
  * Flink release-* branch.
  */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class FlinkKafkaProducerMigrationTest extends KafkaMigrationTestBase {
-    @Parameterized.Parameters(name = "Migration Savepoint: {0}")
+    @Parameters(name = "Migration Savepoint: {0}")
     public static Collection<FlinkVersion> parameters() {
         return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_15);
     }
 
-    public FlinkKafkaProducerMigrationTest(FlinkVersion testMigrateVersion) {
-        super(testMigrateVersion);
-    }
+    //    public FlinkKafkaProducerMigrationTest(FlinkVersion testMigrateVersion) {
+    //        super(testMigrateVersion);
+    //    }

Review Comment:
   do we really need to have it commented in repo?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1005629556


##########
flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052:
##########
@@ -26,12 +26,6 @@ org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: only
 * reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
 * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\

Review Comment:
   you can just remove a corresponding git commit and it should be enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034905365


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -25,24 +25,25 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Unit tests for {@link OffsetsInitializer}. */
 public class OffsetsInitializerTest {
     private static final String TOPIC = "topic";
     private static final String TOPIC2 = "topic2";
     private static KafkaSourceEnumerator.PartitionOffsetsRetrieverImpl retriever;
 
-    @BeforeClass
+    @BeforeAll
     public static void setup() throws Throwable {

Review Comment:
   ```suggestion
       static void setup() throws Throwable {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034891884


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommittableSerializerTest.java:
##########
@@ -29,7 +30,8 @@
  * Tests for serializing and deserialzing {@link KafkaCommittable} with {@link
  * KafkaCommittableSerializer}.
  */
-public class KafkaCommittableSerializerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaCommittableSerializerTest {

Review Comment:
   ```suggestion
   class KafkaCommittableSerializerTest {
   ```
   could be package private



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java:
##########
@@ -45,15 +46,16 @@
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link KafkaRecordSerializationSchemaBuilder}. */
-public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)

Review Comment:
   Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034895369


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -123,18 +130,19 @@ public class KafkaSinkITCase extends TestLogger {
     private SharedReference<AtomicBoolean> failed;
     private SharedReference<AtomicLong> lastCheckpointedRecord;
 
-    @ClassRule
+    @Container
     public static final KafkaContainer KAFKA_CONTAINER =
             createKafkaContainer(KAFKA, LOG)
                     .withEmbeddedZookeeper()
                     .withNetwork(NETWORK)
                     .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
 
-    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+    @RegisterExtension
+    public final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
 
-    @Rule public final TemporaryFolder temp = new TemporaryFolder();
+    @TempDir public Path temp;
 
-    @BeforeClass
+    @BeforeAll
     public static void setupAdmin() {

Review Comment:
   ```suggestion
       static void setupAdmin() {
   ```
   could be package private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1342895879

   Hi @snuyanzin,
   
   I am on vacation and didn't have access to my laptop.
   Will address the comments by this weekend.
   
   Thanks for waiting :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047578778


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -428,13 +434,13 @@ private void createTestTopic(String topic, int numPartitions, short replicationF
                 admin.createTopics(
                         Collections.singletonList(
                                 new NewTopic(topic, numPartitions, replicationFactor)));
-        result.all().get();
+        result.all().get(1, TimeUnit.MINUTES);

Review Comment:
   Don't remember exactly why I added tha
   will remove it for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047582360


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
         deleteTestTopic(topicName);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testPartitionsForAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testPartitionsForAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.partitionsFor("Topic");
+        assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testInitTransactionsAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testInitTransactionsAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.initTransactions();
+        assertThatThrownBy(kafkaProducer::initTransactions)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testBeginTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)
+    void testBeginTransactionAfterClosed() {
         FlinkKafkaInternalProducer<String, String> kafkaProducer =
                 new FlinkKafkaInternalProducer<>(extraProperties);
         kafkaProducer.initTransactions();
         kafkaProducer.close(Duration.ofSeconds(5));
-        kafkaProducer.beginTransaction();
+        assertThatThrownBy(kafkaProducer::beginTransaction)
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(timeout = 30000L, expected = IllegalStateException.class)
-    public void testCommitTransactionAfterClosed() {
+    @Test
+    @Timeout(30L)

Review Comment:
   By default it takes seconds, do we still need to specify?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1354074250

   Hi @snuyanzin 
   
   I am recently encountering issue, where I am not able to run tests.
   Any idea about this
   
   > [ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test (default-test) on project flink-connector-kafka: Execution default-test of goal org.apache.m
   aven.plugins:maven-surefire-plugin:3.0.0-M5:test failed: org.junit.platform.commons.JUnitException: TestEngine with ID 'archunit' failed to discover tests: com.tngtech.archunit.library
   .freeze.FreezingArchRule.allowEmptyShould(Z)Lcom/tngtech/archunit/lang/ArchRule; -> [Help 1]
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1354510303

   I see lots of different issues and it's quite tricky to debug all of them...
   I would suggest to get back to the stage when all the tests are passing e.g. i pick first commit from this PR and rebased to latest master currently it is in my branch (https://github.com/snuyanzin/flink/commit/57cd7804fdd88666e7dfab8049549a6d739ec505). With others there are already issues.
   The proposal is pick this commit and go step by step (or test by test)  till the first error and then first fix the error (all the tests should be green). Then can continue with the next test
   
   what do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org