You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/08 18:31:52 UTC
[GitHub] [flink] ashmeet-kandhari opened a new pull request, #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
ashmeet-kandhari opened a new pull request, #20991:
URL: https://github.com/apache/flink/pull/20991
## What is the purpose of the change
*This pull request migrates the flink-connector-kafka module unit test cases to Junit 5*
## Brief change log
- *Updated simple junit 4 test packages to junit 5 test packages*
- *Disabled a few failing test cases with added TODO comment to discuss or get more info around them*
## Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
- The S3 file system connector: (yes / **no** / don't know)
## Documentation
- Does this pull request introduce a new feature? (yes / **no**)
- If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by "ashmeet-kandhari (via GitHub)" <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1416496203
Hi @snuyanzin
I am thinking of creating new branch and start slowly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047274424
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -49,9 +50,9 @@
import java.util.UUID;
/** IT cases for Kafka. */
-public class KafkaITCase extends KafkaConsumerTestBase {
+class KafkaITCase extends KafkaConsumerTestBase {
- @BeforeClass
+ @BeforeAll
public static void prepare() throws Exception {
Review Comment:
could be package private or protected i guess
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047270638
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testPartitionsForAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.partitionsFor("Topic");
+ assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testInitTransactionsAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testInitTransactionsAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.initTransactions();
+ assertThatThrownBy(kafkaProducer::initTransactions)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testBeginTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testPartitionsForAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.partitionsFor("Topic");
+ assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testInitTransactionsAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testInitTransactionsAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.initTransactions();
+ assertThatThrownBy(kafkaProducer::initTransactions)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testBeginTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testBeginTransactionAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.initTransactions();
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.beginTransaction();
+ assertThatThrownBy(kafkaProducer::beginTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testCommitTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
Review Comment:
time unit should be specified
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047272272
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -219,7 +227,8 @@ public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
* pending records. The test for that is covered in testAtLeastOnceProducer.
*/
@SuppressWarnings("unchecked")
- @Test(timeout = 5000)
+ @Test
+ @Timeout(5L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -280,7 +289,8 @@ public void go() throws Exception {
* the test will not finish if the logic is broken.
*/
@SuppressWarnings("unchecked")
- @Test(timeout = 10000)
+ @Test
+ @Timeout(10L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -353,7 +363,8 @@ public void go() throws Exception {
* records; we set a timeout because the test will not finish if the logic is broken.
*/
@SuppressWarnings("unchecked")
- @Test(timeout = 5000)
+ @Test
+ @Timeout(10L)
Review Comment:
time unit should be specified
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by "ashmeet-kandhari (via GitHub)" <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1613909720
Hi @sergey,
Unfortunately, I will not be able to work on this due to some personal work.
Better to reassign this task
Regards,
Ashmeet.
On Fri, Jun 16, 2023, 04:24 ashmeet kandhari ***@***.***>
wrote:
> Hi @Sergey,
>
> I am on vacation and will be back in July.
>
> I can continue after that.
>
> Thanks
> Ashmeet
>
> On Tue, Jun 6, 2023, 13:11 Sergey Nuyanzin ***@***.***>
> wrote:
>
>> Hi @ashmeet-kandhari <https://github.com/ashmeet-kandhari> I wonder
>> whether you going to continue working on this PR or not?
>>
>> —
>> Reply to this email directly, view it on GitHub
>> <https://github.com/apache/flink/pull/20991#issuecomment-1578500753>, or
>> unsubscribe
>> <https://github.com/notifications/unsubscribe-auth/AI7OGHGWRFOJOPZSGQCUWQDXJ4F6HANCNFSM6AAAAAARALJTVI>
>> .
>> You are receiving this because you were mentioned.Message ID:
>> ***@***.***>
>>
>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1279718997
> Thanks for your contribution @ashmeet-kandhari As I mentioned in jira issue we could continue code related discussions here
>
> Could you please share more detail/link to ci with failure you've mentioned? also about changes: i think changes of `docs/themes/book` are not necessary for this PR and could be removed
Hi @snuyanzin,
I was testing the migration changes made to KafkaTableITCase and UpsertKafkaTableITCase locally, I have pushed it now
Will see the CI again and let you know if it's failing.
I will also slowly revert the changes made in `docs/themes/book` not sure what caused them, as I have not touched it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034898677
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializerTest.java:
##########
@@ -29,7 +30,8 @@
* Tests for serializing and deserialzing {@link KafkaWriterState} with {@link
* KafkaWriterStateSerializer}.
*/
-public class KafkaWriterStateSerializerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaWriterStateSerializerTest {
Review Comment:
```suggestion
class KafkaWriterStateSerializerTest {
```
could be package private
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionIdFactoryTest.java:
##########
@@ -17,14 +17,16 @@
package org.apache.flink.connector.kafka.sink;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link TransactionalIdFactory}. */
-public class TransactionIdFactoryTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
Review Comment:
Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047582623
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testPartitionsForAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.partitionsFor("Topic");
+ assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testInitTransactionsAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testInitTransactionsAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.initTransactions();
+ assertThatThrownBy(kafkaProducer::initTransactions)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testBeginTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testBeginTransactionAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.initTransactions();
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.beginTransaction();
+ assertThatThrownBy(kafkaProducer::beginTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testCommitTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testCommitTransactionAfterClosed() {
String topicName = "testCommitTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
- kafkaProducer.commitTransaction();
+ assertThatThrownBy(kafkaProducer::commitTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testResumeTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testResumeTransactionAfterClosed() {
String topicName = "testAbortTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
- kafkaProducer.resumeTransaction(0L, (short) 1);
+ assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testAbortTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
Review Comment:
By default it takes seconds, do we still need to specify?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1348765223
it seems that 4-th and 5-th problems are connected
regarding
> 5th Comment: I might need some help on how to revert the change for that. As I cannot see anything in intellij that points to changes made in that folder
you can remove a dedicated commit or give me grants to your branch, and i can do it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047274864
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1000270961
##########
flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052:
##########
@@ -26,12 +26,6 @@ org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: only
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
Review Comment:
Is there a good reason why archunit violations should be updated?
Otherwise i think it should be reverted
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1003145131
##########
flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052:
##########
@@ -26,12 +26,6 @@ org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: only
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
Review Comment:
Hi @snuyanzin
These archunit violations are getting updated automatically after I build the `flink-connector-kafka` module.
As I have not been interacting with these files, I thought these are expected changes.
Can you please advice me what might be going wrong as the this file gets updated automatically on building the module?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [FLINK-25538][flink-connector-kafka] JUnit5 Migration [flink]
Posted by "davidradl (via GitHub)" <gi...@apache.org>.
davidradl commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1746931693
@snuyanzin I see https://github.com/apache/flink/pull/22797 , this pr is no longer relevant to this repository, I suggest closing this pr and porting the change to the [new Flink connector kafka repo]( https://github.com/apache/flink-connector-kafka) if it is still relevant.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047581701
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -102,8 +106,9 @@ public void testHappyPath() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L)
- public void testResumeTransaction() throws Exception {
+ @Test
+ @Timeout(30L)
Review Comment:
By default it takes seconds, do we still need to specify?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047268582
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -29,25 +29,28 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for our own {@link FlinkKafkaInternalProducer}. */
@SuppressWarnings("serial")
-public class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
+class FlinkKafkaInternalProducerITCase extends KafkaTestBase {
protected String transactionalId;
protected Properties extraProperties;
private volatile Exception exceptionInCallback;
- @BeforeClass
+ @BeforeAll
public static void prepare() throws Exception {
Review Comment:
could be package private i guess
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -64,7 +67,7 @@ public static void prepare() throws Exception {
.setKafkaServerProperties(serverProperties));
}
- @Before
+ @BeforeEach
public void before() {
Review Comment:
could be package private i guess
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1003145898
##########
flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052:
##########
@@ -26,12 +26,6 @@ org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: only
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
Review Comment:
I also tried reverting these file changes and just run `mvn test` command on this module alone and again the file gets updated automatically
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1303971800
Hi @snuyanzin
I have made changes for the comments added.
Let me know if anything else is there
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034893121
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java:
##########
@@ -30,7 +31,8 @@
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link KafkaSinkBuilder}. */
-public class KafkaSinkBuilderTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
Review Comment:
Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034895784
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -143,12 +151,13 @@ public static void setupAdmin() {
admin = AdminClient.create(properties);
}
- @AfterClass
+ @AfterAll
public static void teardownAdmin() {
Review Comment:
```suggestion
static void teardownAdmin() {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034906776
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java:
##########
@@ -45,17 +45,19 @@ public class KafkaSubscriberTest {
private static final TopicPartition NON_EXISTING_TOPIC = new TopicPartition("removed", 0);
private static AdminClient adminClient;
- @BeforeClass
+ @BeforeAll
public static void setup() throws Throwable {
Review Comment:
could be package private
```suggestion
static void setup() throws Throwable {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1006434223
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleExactlyOnceITCase.java:
##########
@@ -39,7 +37,7 @@
/** Failure Recovery IT Test for KafkaShuffle. */
public class KafkaShuffleExactlyOnceITCase extends KafkaShuffleTestBase {
- @Rule public final Timeout timeout = Timeout.millis(600000L);
+ // @RegisterExtension public final Timeout timeout = Timeout.millis(600000L);
Review Comment:
fixed it
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java:
##########
@@ -61,7 +59,7 @@
/** Simple End to End Test for Kafka. */
public class KafkaShuffleITCase extends KafkaShuffleTestBase {
- @Rule public final Timeout timeout = Timeout.millis(600000L);
+ // @RegisterExtension public final Timeout timeout = Timeout.millis(600000L);
Review Comment:
fixed it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047275232
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithLegacySerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithLegacySerializer() throws Exception {
runProduceConsumeMultipleTopics(true);
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithLegacySerializer() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithLegacySerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithLegacySerializer() throws Exception {
runProduceConsumeMultipleTopics(true);
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithKafkaSerializer() throws Exception {
runProduceConsumeMultipleTopics(false);
}
- @Test(timeout = 60000)
- public void testAllDeletes() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testAllDeletes() throws Exception {
runAllDeletesTest();
}
- @Test(timeout = 60000)
- public void testMetricsAndEndOfStream() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMetricsAndEndOfStream() throws Exception {
runEndOfStreamTest();
}
// --- startup mode ---
- @Test(timeout = 60000)
- public void testStartFromEarliestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromEarliestOffsets() throws Exception {
runStartFromEarliestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromLatestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromLatestOffsets() throws Exception {
runStartFromLatestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromGroupOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithLegacySerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithLegacySerializer() throws Exception {
runProduceConsumeMultipleTopics(true);
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithKafkaSerializer() throws Exception {
runProduceConsumeMultipleTopics(false);
}
- @Test(timeout = 60000)
- public void testAllDeletes() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testAllDeletes() throws Exception {
runAllDeletesTest();
}
- @Test(timeout = 60000)
- public void testMetricsAndEndOfStream() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithLegacySerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithLegacySerializer() throws Exception {
runProduceConsumeMultipleTopics(true);
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithKafkaSerializer() throws Exception {
runProduceConsumeMultipleTopics(false);
}
- @Test(timeout = 60000)
- public void testAllDeletes() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithLegacySerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithLegacySerializer() throws Exception {
runProduceConsumeMultipleTopics(true);
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithKafkaSerializer() throws Exception {
runProduceConsumeMultipleTopics(false);
}
- @Test(timeout = 60000)
- public void testAllDeletes() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testAllDeletes() throws Exception {
runAllDeletesTest();
}
- @Test(timeout = 60000)
- public void testMetricsAndEndOfStream() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMetricsAndEndOfStream() throws Exception {
runEndOfStreamTest();
}
// --- startup mode ---
- @Test(timeout = 60000)
- public void testStartFromEarliestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromEarliestOffsets() throws Exception {
runStartFromEarliestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromLatestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithLegacySerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithLegacySerializer() throws Exception {
runProduceConsumeMultipleTopics(true);
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithKafkaSerializer() throws Exception {
runProduceConsumeMultipleTopics(false);
}
- @Test(timeout = 60000)
- public void testAllDeletes() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testAllDeletes() throws Exception {
runAllDeletesTest();
}
- @Test(timeout = 60000)
- public void testMetricsAndEndOfStream() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMetricsAndEndOfStream() throws Exception {
runEndOfStreamTest();
}
// --- startup mode ---
- @Test(timeout = 60000)
- public void testStartFromEarliestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromEarliestOffsets() throws Exception {
runStartFromEarliestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromLatestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromLatestOffsets() throws Exception {
runStartFromLatestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromGroupOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromGroupOffsets() throws Exception {
runStartFromGroupOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromSpecificOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java:
##########
@@ -79,20 +77,20 @@ public String getOperatorSnapshotPath(FlinkVersion version) {
* Override {@link KafkaTestBase}. Kafka Migration Tests are starting up Kafka/ZooKeeper cluster
* manually
*/
- @BeforeClass
+ @BeforeAll
public static void prepare() throws Exception {}
/**
* Override {@link KafkaTestBase}. Kafka Migration Tests are starting up Kafka/ZooKeeper cluster
* manually
*/
- @AfterClass
+ @AfterAll
public static void shutDownServices() throws Exception {}
Review Comment:
could be package private or protected i guess
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithLegacySerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithLegacySerializer() throws Exception {
runProduceConsumeMultipleTopics(true);
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithKafkaSerializer() throws Exception {
runProduceConsumeMultipleTopics(false);
}
- @Test(timeout = 60000)
- public void testAllDeletes() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testAllDeletes() throws Exception {
runAllDeletesTest();
}
- @Test(timeout = 60000)
- public void testMetricsAndEndOfStream() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMetricsAndEndOfStream() throws Exception {
runEndOfStreamTest();
}
// --- startup mode ---
- @Test(timeout = 60000)
- public void testStartFromEarliestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##########
@@ -65,16 +68,16 @@ public class KafkaShortRetentionTestBase implements Serializable {
private static KafkaTestEnvironment kafkaServer;
private static Properties standardProps;
- @ClassRule
- public static MiniClusterWithClientResource flink =
- new MiniClusterWithClientResource(
+ @RegisterExtension
+ public static MiniClusterExtension flink =
+ new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfiguration())
.setNumberTaskManagers(NUM_TMS)
.setNumberSlotsPerTaskManager(TM_SLOTS)
.build());
- @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+ @TempDir public static Path tempFolder;
Review Comment:
do we need it `public`?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##########
@@ -84,7 +87,7 @@ private static Configuration getConfiguration() {
return flinkConfig;
}
- @BeforeClass
+ @BeforeAll
public static void prepare() throws Exception {
Review Comment:
do we need it `public`
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -92,17 +93,15 @@ public abstract class KafkaTestBase extends TestLogger {
public static KafkaTestEnvironment kafkaServer;
- @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+ @TempDir public static Path tempFolder;
public static Properties secureProps = new Properties();
- @Rule public final RetryRule retryRule = new RetryRule();
-
// ------------------------------------------------------------------------
// Setup and teardown of the mini clusters
// ------------------------------------------------------------------------
- @BeforeClass
+ @BeforeAll
public static void prepare() throws Exception {
Review Comment:
do we need to have it `public`?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithLegacySerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithLegacySerializer() throws Exception {
runProduceConsumeMultipleTopics(true);
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithKafkaSerializer() throws Exception {
runProduceConsumeMultipleTopics(false);
}
- @Test(timeout = 60000)
- public void testAllDeletes() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testAllDeletes() throws Exception {
runAllDeletesTest();
}
- @Test(timeout = 60000)
- public void testMetricsAndEndOfStream() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMetricsAndEndOfStream() throws Exception {
runEndOfStreamTest();
}
// --- startup mode ---
- @Test(timeout = 60000)
- public void testStartFromEarliestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromEarliestOffsets() throws Exception {
runStartFromEarliestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromLatestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromLatestOffsets() throws Exception {
runStartFromLatestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromGroupOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromGroupOffsets() throws Exception {
runStartFromGroupOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromSpecificOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromSpecificOffsets() throws Exception {
runStartFromSpecificOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromTimestamp() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithLegacySerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithLegacySerializer() throws Exception {
runProduceConsumeMultipleTopics(true);
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithKafkaSerializer() throws Exception {
runProduceConsumeMultipleTopics(false);
}
- @Test(timeout = 60000)
- public void testAllDeletes() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testAllDeletes() throws Exception {
runAllDeletesTest();
}
- @Test(timeout = 60000)
- public void testMetricsAndEndOfStream() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMetricsAndEndOfStream() throws Exception {
runEndOfStreamTest();
}
// --- startup mode ---
- @Test(timeout = 60000)
- public void testStartFromEarliestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromEarliestOffsets() throws Exception {
runStartFromEarliestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromLatestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromLatestOffsets() throws Exception {
runStartFromLatestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromGroupOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromGroupOffsets() throws Exception {
runStartFromGroupOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromSpecificOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromSpecificOffsets() throws Exception {
runStartFromSpecificOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromTimestamp() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromTimestamp() throws Exception {
runStartFromTimestamp();
}
// --- offset committing ---
- @Test(timeout = 60000)
- public void testCommitOffsetsToKafka() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCommitOffsetsToKafka() throws Exception {
runCommitOffsetsToKafka();
}
- @Test(timeout = 60000)
- public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
runAutoOffsetRetrievalAndCommitToKafka();
}
- @Test(timeout = 60000)
- public void testCollectingSchema() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -92,17 +93,15 @@ public abstract class KafkaTestBase extends TestLogger {
public static KafkaTestEnvironment kafkaServer;
- @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+ @TempDir public static Path tempFolder;
Review Comment:
do we need to have it `public`?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java:
##########
@@ -79,20 +77,20 @@ public String getOperatorSnapshotPath(FlinkVersion version) {
* Override {@link KafkaTestBase}. Kafka Migration Tests are starting up Kafka/ZooKeeper cluster
* manually
*/
- @BeforeClass
+ @BeforeAll
public static void prepare() throws Exception {}
Review Comment:
could be package private or protected i guess
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithLegacySerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithLegacySerializer() throws Exception {
runProduceConsumeMultipleTopics(true);
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithKafkaSerializer() throws Exception {
runProduceConsumeMultipleTopics(false);
}
- @Test(timeout = 60000)
- public void testAllDeletes() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testAllDeletes() throws Exception {
runAllDeletesTest();
}
- @Test(timeout = 60000)
- public void testMetricsAndEndOfStream() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMetricsAndEndOfStream() throws Exception {
runEndOfStreamTest();
}
// --- startup mode ---
- @Test(timeout = 60000)
- public void testStartFromEarliestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromEarliestOffsets() throws Exception {
runStartFromEarliestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromLatestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromLatestOffsets() throws Exception {
runStartFromLatestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromGroupOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromGroupOffsets() throws Exception {
runStartFromGroupOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromSpecificOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromSpecificOffsets() throws Exception {
runStartFromSpecificOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromTimestamp() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromTimestamp() throws Exception {
runStartFromTimestamp();
}
// --- offset committing ---
- @Test(timeout = 60000)
- public void testCommitOffsetsToKafka() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCommitOffsetsToKafka() throws Exception {
runCommitOffsetsToKafka();
}
- @Test(timeout = 60000)
- public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
runAutoOffsetRetrievalAndCommitToKafka();
}
- @Test(timeout = 60000)
- public void testCollectingSchema() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCollectingSchema() throws Exception {
runCollectingSchemaTest();
}
/** Kafka 20 specific test, ensuring Timestamps are properly written to and read from Kafka. */
- @Test(timeout = 60000)
- public void testTimestamps() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithLegacySerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithLegacySerializer() throws Exception {
runProduceConsumeMultipleTopics(true);
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithKafkaSerializer() throws Exception {
runProduceConsumeMultipleTopics(false);
}
- @Test(timeout = 60000)
- public void testAllDeletes() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testAllDeletes() throws Exception {
runAllDeletesTest();
}
- @Test(timeout = 60000)
- public void testMetricsAndEndOfStream() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMetricsAndEndOfStream() throws Exception {
runEndOfStreamTest();
}
// --- startup mode ---
- @Test(timeout = 60000)
- public void testStartFromEarliestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromEarliestOffsets() throws Exception {
runStartFromEarliestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromLatestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromLatestOffsets() throws Exception {
runStartFromLatestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromGroupOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromGroupOffsets() throws Exception {
runStartFromGroupOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromSpecificOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromSpecificOffsets() throws Exception {
runStartFromSpecificOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromTimestamp() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromTimestamp() throws Exception {
runStartFromTimestamp();
}
// --- offset committing ---
- @Test(timeout = 60000)
- public void testCommitOffsetsToKafka() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -79,7 +79,8 @@
*/
@SuppressWarnings("serial")
@RetryOnFailure(times = 3)
Review Comment:
yes it makes sense
it will simplify to remove junit4 related stuff
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerAtLeastOnceITCase.java:
##########
@@ -18,13 +18,13 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.junit.BeforeClass;
+import org.junit.jupiter.api.BeforeAll;
/** IT cases for the {@link FlinkKafkaProducer}. */
@SuppressWarnings("serial")
public class KafkaProducerAtLeastOnceITCase extends KafkaProducerTestBase {
- @BeforeClass
+ @BeforeAll
public static void prepare() throws Exception {
Review Comment:
could be package private
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##########
@@ -113,11 +116,14 @@ public static void prepare() throws Exception {
standardProps = kafkaServer.getStandardProperties();
}
- @AfterClass
- public static void shutDownServices() throws Exception {
+ @AfterAll
+ public static void shutDownServices(@InjectMiniCluster MiniCluster miniCluster)
+ throws Exception {
kafkaServer.shutdown();
secureProps.clear();
+
+ miniCluster.close();
Review Comment:
why should we start to close something if we didn't open it?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBigRecordJob() throws Exception {
runBigRecordTestTopology();
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithLegacySerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithLegacySerializer() throws Exception {
runProduceConsumeMultipleTopics(true);
}
- @Test(timeout = 60000)
- public void testMultipleTopicsWithKafkaSerializer() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleTopicsWithKafkaSerializer() throws Exception {
runProduceConsumeMultipleTopics(false);
}
- @Test(timeout = 60000)
- public void testAllDeletes() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testAllDeletes() throws Exception {
runAllDeletesTest();
}
- @Test(timeout = 60000)
- public void testMetricsAndEndOfStream() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMetricsAndEndOfStream() throws Exception {
runEndOfStreamTest();
}
// --- startup mode ---
- @Test(timeout = 60000)
- public void testStartFromEarliestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromEarliestOffsets() throws Exception {
runStartFromEarliestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromLatestOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromLatestOffsets() throws Exception {
runStartFromLatestOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromGroupOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromGroupOffsets() throws Exception {
runStartFromGroupOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromSpecificOffsets() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromSpecificOffsets() throws Exception {
runStartFromSpecificOffsets();
}
- @Test(timeout = 60000)
- public void testStartFromTimestamp() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testStartFromTimestamp() throws Exception {
runStartFromTimestamp();
}
// --- offset committing ---
- @Test(timeout = 60000)
- public void testCommitOffsetsToKafka() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCommitOffsetsToKafka() throws Exception {
runCommitOffsetsToKafka();
}
- @Test(timeout = 60000)
- public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBaseWithFlink.java:
##########
@@ -30,9 +30,9 @@ public abstract class KafkaTestBaseWithFlink extends KafkaTestBase {
protected static final int TM_SLOTS = 8;
- @ClassRule
- public static MiniClusterWithClientResource flink =
- new MiniClusterWithClientResource(
+ @RegisterExtension
+ public static final MiniClusterExtension FLINK =
Review Comment:
do we need to have it `public`?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java:
##########
@@ -113,11 +116,14 @@ public static void prepare() throws Exception {
standardProps = kafkaServer.getStandardProperties();
}
- @AfterClass
- public static void shutDownServices() throws Exception {
+ @AfterAll
+ public static void shutDownServices(@InjectMiniCluster MiniCluster miniCluster)
Review Comment:
do we need it `public`?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -111,7 +110,7 @@ public static void prepare() throws Exception {
startClusters(false);
}
- @AfterClass
+ @AfterAll
public static void shutDownServices() throws Exception {
Review Comment:
do we need to have it `public`?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java:
##########
@@ -128,15 +126,19 @@ private OperatorSubtaskState initializeTestState() throws Exception {
}
@SuppressWarnings("warning")
- @Test
+ @TestTemplate
public void testRestoreProducer() throws Exception {
Review Comment:
could be package private
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExactlyOnceITCase.java:
##########
@@ -18,21 +18,21 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
/** IT cases for the {@link FlinkKafkaProducer}. */
@SuppressWarnings("serial")
-public class KafkaProducerExactlyOnceITCase extends KafkaProducerTestBase {
- @BeforeClass
+class KafkaProducerExactlyOnceITCase extends KafkaProducerTestBase {
+ @BeforeAll
public static void prepare() throws Exception {
Review Comment:
could be package private
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java:
##########
@@ -62,131 +63,154 @@ public static void prepare() throws Exception {
// Suite of Tests
// ------------------------------------------------------------------------
- @Test(timeout = 120000)
- public void testFailOnNoBroker() throws Exception {
+ @Test
+ @Timeout(120L)
+ void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}
- @Test(timeout = 60000)
- public void testConcurrentProducerConsumerTopology() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testConcurrentProducerConsumerTopology() throws Exception {
runSimpleConcurrentProducerConsumerTopology();
}
- @Test(timeout = 60000)
- public void testKeyValueSupport() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testKeyValueSupport() throws Exception {
runKeyValueTest();
}
// --- canceling / failures ---
- @Test(timeout = 60000)
- public void testCancelingEmptyTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingEmptyTopic() throws Exception {
runCancelingOnEmptyInputTest();
}
- @Test(timeout = 60000)
- public void testCancelingFullTopic() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
// --- source to partition mappings and exactly once ---
- @Test(timeout = 60000)
- public void testOneToOneSources() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneToOneSources() throws Exception {
runOneToOneExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testOneSourceMultiplePartitions() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testOneSourceMultiplePartitions() throws Exception {
runOneSourceMultiplePartitionsExactlyOnceTest();
}
- @Test(timeout = 60000)
- public void testMultipleSourcesOnePartition() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testMultipleSourcesOnePartition() throws Exception {
runMultipleSourcesOnePartitionExactlyOnceTest();
}
// --- broker failure ---
- @Test(timeout = 60000)
- public void testBrokerFailure() throws Exception {
+ @Test
+ @Timeout(60L)
+ void testBrokerFailure() throws Exception {
runBrokerFailureTest();
}
// --- special executions ---
- @Test(timeout = 60000)
- public void testBigRecordJob() throws Exception {
+ @Test
+ @Timeout(60L)
Review Comment:
time unit should be specified
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1003145898
##########
flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052:
##########
@@ -26,12 +26,6 @@ org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: only
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
Review Comment:
I also tried reverting these file changes and just run `mvn test` command on this module alone and again the file gets updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1318656114
As I see there are some conflicts could you please resolve them?
Also it seems build failed because of `spotless` issues could you please run `mvn spotless:apply` before commit. It will do autoformatting
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034892886
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java:
##########
@@ -45,15 +46,16 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link KafkaRecordSerializationSchemaBuilder}. */
-public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaRecordSerializationSchemaBuilderTest {
private static final String DEFAULT_TOPIC = "test";
private static Map<String, ?> configurableConfiguration;
private static Map<String, ?> configuration;
private static boolean isKeySerializer;
- @Before
+ @BeforeEach
public void setUp() {
Review Comment:
```suggestion
void setUp() {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1342859067
hi @ashmeet-kandhari do you have any questions/concerns/whatever?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by "ashmeet-kandhari (via GitHub)" <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1594034092
Hi @Sergey,
I am on vacation and will be back in July.
I can continue after that.
Thanks
Ashmeet
On Tue, Jun 6, 2023, 13:11 Sergey Nuyanzin ***@***.***> wrote:
> Hi @ashmeet-kandhari <https://github.com/ashmeet-kandhari> I wonder
> whether you going to continue working on this PR or not?
>
> —
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/flink/pull/20991#issuecomment-1578500753>, or
> unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AI7OGHGWRFOJOPZSGQCUWQDXJ4F6HANCNFSM6AAAAAARALJTVI>
> .
> You are receiving this because you were mentioned.Message ID:
> ***@***.***>
>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034894549
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -108,7 +113,9 @@
import static org.assertj.core.api.Assertions.fail;
/** Tests for using KafkaSink writing to a Kafka cluster. */
-public class KafkaSinkITCase extends TestLogger {
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
Review Comment:
Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034899201
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionIdFactoryTest.java:
##########
@@ -17,14 +17,16 @@
package org.apache.flink.connector.kafka.sink;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link TransactionalIdFactory}. */
-public class TransactionIdFactoryTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class TransactionIdFactoryTest {
Review Comment:
```suggestion
class TransactionIdFactoryTest {
```
could be package private
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionToAbortCheckerTest.java:
##########
@@ -17,19 +17,21 @@
package org.apache.flink.connector.kafka.sink;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link TransactionsToAbortChecker}. */
-public class TransactionToAbortCheckerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
Review Comment:
Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034897483
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java:
##########
@@ -49,19 +51,21 @@
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link KafkaTransactionLog} to retrieve abortable Kafka transactions. */
-public class KafkaTransactionLogITCase extends TestLogger {
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaTransactionLogITCase {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkITCase.class);
private static final String TOPIC_NAME = "kafkaTransactionLogTest";
private static final String TRANSACTIONAL_ID_PREFIX = "kafka-log";
- @ClassRule
+ @Container
public static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG).withEmbeddedZookeeper();
private final List<Producer<byte[], Integer>> openProducers = new ArrayList<>();
- @After
+ @AfterEach
public void tearDown() {
Review Comment:
```suggestion
void tearDown() {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034904171
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -166,7 +167,8 @@ public void testReaderRegistrationTriggersAssignments() throws Throwable {
}
}
- @Test(timeout = 30000L)
+ @Test
+ @Timeout(30000L)
Review Comment:
In junit5 default unit is seconds while in junit4 is millis.
Do we really need to set timeout for 30000 secs?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047264081
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -428,13 +434,13 @@ private void createTestTopic(String topic, int numPartitions, short replicationF
admin.createTopics(
Collections.singletonList(
new NewTopic(topic, numPartitions, replicationFactor)));
- result.all().get();
+ result.all().get(1, TimeUnit.MINUTES);
Review Comment:
why do we need this change?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -428,13 +434,13 @@ private void createTestTopic(String topic, int numPartitions, short replicationF
admin.createTopics(
Collections.singletonList(
new NewTopic(topic, numPartitions, replicationFactor)));
- result.all().get();
+ result.all().get(1, TimeUnit.MINUTES);
}
private void deleteTestTopic(String topic)
throws ExecutionException, InterruptedException, TimeoutException {
final DeleteTopicsResult result = admin.deleteTopics(Collections.singletonList(topic));
- result.all().get();
+ result.all().get(1, TimeUnit.MINUTES);
Review Comment:
why do we need this change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1323914415
Hi @snuyanzin
I tried resolving the conflicts (haven't pushed yet), but when I try to build locally getting the following error
```
Too many files with unapproved license: 14 See RAT report in: D:\Projects\
Intellij\git\flink\target\rat.txt
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1323924577
what are the changes there?
from my point of view they should not be changed and should not be a part of this PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1291980945
I noticed that classes with `@Parametrized` annotation are not migrated well....
I would suggest to in more incremental approach:
The initial state e.g. master should be pass all the tests
then just convert one test check it passes and commit and so on (in case of test hierarchy it could require more tests to change) however anyway this approach will simplify detection of problem tests and possible issues
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1005571712
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -79,7 +79,8 @@
*/
@SuppressWarnings("serial")
@RetryOnFailure(times = 3)
Review Comment:
In javadoc[1] for `RetryOnFailure` is stated that
> \* <p>Add the {@link RetryRule} to your test class and annotate the class and/or tests with {@link
> \* RetryOnFailure}.
which means it makes sense to use it only with junit4. With junit5 there should be used `RetryOnException`
[1] https://github.com/apache/flink/blob/f8c6a668cd2b887f33a0cf4608de2d6b95c71f03/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryOnFailure.java#L30
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1005571712
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -79,7 +79,8 @@
*/
@SuppressWarnings("serial")
@RetryOnFailure(times = 3)
Review Comment:
In javadoc[1] for `RetryOnFailure` is stated that
> \* <p>Add the {@link RetryRule} to your test class and annotate the class and/or tests with {@ link
> \* RetryOnFailure}.
which means it makes sense to use it only with junit4. With junit5 there should be used `RetryOnException`
[1] https://github.com/apache/flink/blob/f8c6a668cd2b887f33a0cf4608de2d6b95c71f03/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryOnFailure.java#L30
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -79,7 +79,8 @@
*/
@SuppressWarnings("serial")
@RetryOnFailure(times = 3)
Review Comment:
In javadoc[1] for `RetryOnFailure` is stated that
> \* <p>Add the {@ link RetryRule} to your test class and annotate the class and/or tests with {@ link
> \* RetryOnFailure}.
which means it makes sense to use it only with junit4. With junit5 there should be used `RetryOnException`
[1] https://github.com/apache/flink/blob/f8c6a668cd2b887f33a0cf4608de2d6b95c71f03/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryOnFailure.java#L30
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034896169
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -143,12 +151,13 @@ public static void setupAdmin() {
admin = AdminClient.create(properties);
}
- @AfterClass
+ @AfterAll
public static void teardownAdmin() {
admin.close();
+ KAFKA_CONTAINER.close();
}
- @Before
+ @BeforeEach
public void setUp() throws ExecutionException, InterruptedException, TimeoutException {
Review Comment:
```suggestion
void setUp() throws ExecutionException, InterruptedException, TimeoutException {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034894913
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -108,7 +113,9 @@
import static org.assertj.core.api.Assertions.fail;
/** Tests for using KafkaSink writing to a Kafka cluster. */
-public class KafkaSinkITCase extends TestLogger {
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaSinkITCase {
Review Comment:
```suggestion
class KafkaSinkITCase {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034893527
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java:
##########
@@ -30,7 +31,8 @@
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link KafkaSinkBuilder}. */
-public class KafkaSinkBuilderTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaSinkBuilderTest {
Review Comment:
```suggestion
class KafkaSinkBuilderTest {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034907770
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java:
##########
@@ -53,7 +53,7 @@ public class KafkaRecordDeserializationSchemaTest {
private static Map<String, ?> configuration;
private static boolean isKeyDeserializer;
- @Before
+ @BeforeEach
public void setUp() {
Review Comment:
```suggestion
void setUp() {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1325605146
Hi @snuyanzin,
I have resolved the conflicts. Please review it, whenever you can
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1277859592
By the way to avoid could you please try this command on your branch
```
mvn -DskipTests -Dfast-Pskip-webui-build -T1C clean install
```
on your branch checkout to rule out that it is caused by some mismatch of SNAPSHOT versions of Flink modules
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1323920168
Oh not sure why are they here, but this is what I found from the rat.txt logs
```
14 Unknown Licenses
*****************************************************
Files with unapproved licenses:
flink-connectors/flink-connector-files/src/test/resources/bucket-state-migration-test/empty-v1/snapshot
flink-connectors/flink-connector-files/src/test/resources/bucket-state-migration-test/empty-v2/snapshot
flink-connectors/flink-connector-files/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot
flink-connectors/flink-connector-files/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot
flink-connectors/flink-connector-files/src/test/resources/committable-serializer-migration/in-progress-v1/committable
flink-connectors/flink-connector-files/src/test/resources/committable-serializer-migration/pending-v1/committable
flink-connectors/flink-file-sink-common/src/test/resources/recoverable-serializer-migration/in-progress-v1/recoverable
flink-connectors/flink-file-sink-common/src/test/resources/recoverable-serializer-migration/pending-v1/recoverable
flink-core/src/test/resources/abstractID-with-toString-field
flink-core/src/test/resources/abstractID-with-toString-field-set
flink-streaming-java/src/test/resources/bucket-state-migration-test/empty-v1/snapshot
flink-streaming-java/src/test/resources/bucket-state-migration-test/empty-v2/snapshot
flink-streaming-java/src/test/resources/bucket-state-migration-test/full-no-in-progress-v1-template/snapshot
flink-streaming-java/src/test/resources/bucket-state-migration-test/full-v1-template/snapshot
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047581939
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
Review Comment:
By default it takes seconds, do we still need to specify?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testPartitionsForAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.partitionsFor("Topic");
+ assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testInitTransactionsAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testInitTransactionsAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.initTransactions();
+ assertThatThrownBy(kafkaProducer::initTransactions)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testBeginTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
Review Comment:
By default it takes seconds, do we still need to specify?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testPartitionsForAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.partitionsFor("Topic");
+ assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testInitTransactionsAfterClosed() {
+ @Test
+ @Timeout(30L)
Review Comment:
By default it takes seconds, do we still need to specify?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047270252
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -102,8 +106,9 @@ public void testHappyPath() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L)
- public void testResumeTransaction() throws Exception {
+ @Test
+ @Timeout(30L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testPartitionsForAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.partitionsFor("Topic");
+ assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testInitTransactionsAfterClosed() {
+ @Test
+ @Timeout(30L)
Review Comment:
time unit should be specified
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047266160
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceLegacyITCase.java:
##########
@@ -37,7 +37,7 @@ public KafkaSourceLegacyITCase() throws Exception {
super(true);
}
- @BeforeClass
+ @BeforeAll
public static void prepare() throws Exception {
Review Comment:
ok let's change it to protected
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047271538
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testPartitionsForAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.partitionsFor("Topic");
+ assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testInitTransactionsAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testInitTransactionsAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.initTransactions();
+ assertThatThrownBy(kafkaProducer::initTransactions)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testBeginTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testBeginTransactionAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.initTransactions();
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.beginTransaction();
+ assertThatThrownBy(kafkaProducer::beginTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testCommitTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testCommitTransactionAfterClosed() {
String topicName = "testCommitTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
- kafkaProducer.commitTransaction();
+ assertThatThrownBy(kafkaProducer::commitTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testResumeTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testResumeTransactionAfterClosed() {
String topicName = "testAbortTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
- kafkaProducer.resumeTransaction(0L, (short) 1);
+ assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testAbortTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ // TODO verify why 'The producer 951754235 has already been closed' is coming
+ @Disabled
Review Comment:
why is it disabled?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testPartitionsForAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.partitionsFor("Topic");
+ assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testInitTransactionsAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testInitTransactionsAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.initTransactions();
+ assertThatThrownBy(kafkaProducer::initTransactions)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testBeginTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testBeginTransactionAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.initTransactions();
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.beginTransaction();
+ assertThatThrownBy(kafkaProducer::beginTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testCommitTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testCommitTransactionAfterClosed() {
String topicName = "testCommitTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
- kafkaProducer.commitTransaction();
+ assertThatThrownBy(kafkaProducer::commitTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testResumeTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
Review Comment:
time unit should be specified
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testPartitionsForAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.partitionsFor("Topic");
+ assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testInitTransactionsAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testInitTransactionsAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.initTransactions();
+ assertThatThrownBy(kafkaProducer::initTransactions)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testBeginTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testBeginTransactionAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.initTransactions();
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.beginTransaction();
+ assertThatThrownBy(kafkaProducer::beginTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testCommitTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testCommitTransactionAfterClosed() {
String topicName = "testCommitTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
- kafkaProducer.commitTransaction();
+ assertThatThrownBy(kafkaProducer::commitTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testResumeTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testResumeTransactionAfterClosed() {
String topicName = "testAbortTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
- kafkaProducer.resumeTransaction(0L, (short) 1);
+ assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testAbortTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ // TODO verify why 'The producer 951754235 has already been closed' is coming
+ @Disabled
+ void testAbortTransactionAfterClosed() {
String topicName = "testAbortTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
kafkaProducer.abortTransaction();
- kafkaProducer.resumeTransaction(0L, (short) 1);
+ assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testFlushAfterClosed() {
+ @Test
+ @Timeout(30L)
Review Comment:
time unit should be specified
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034901013
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -72,14 +73,14 @@ public class KafkaEnumeratorTest {
private static final boolean INCLUDE_DYNAMIC_TOPIC = true;
private static final boolean EXCLUDE_DYNAMIC_TOPIC = false;
- @BeforeClass
+ @BeforeAll
public static void setup() throws Throwable {
Review Comment:
```suggestion
static void setup() throws Throwable {
```
could be package private
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -72,14 +73,14 @@ public class KafkaEnumeratorTest {
private static final boolean INCLUDE_DYNAMIC_TOPIC = true;
private static final boolean EXCLUDE_DYNAMIC_TOPIC = false;
- @BeforeClass
+ @BeforeAll
public static void setup() throws Throwable {
KafkaSourceTestEnv.setup();
KafkaSourceTestEnv.setupTopic(TOPIC1, true, true, KafkaSourceTestEnv::getRecordsForTopic);
KafkaSourceTestEnv.setupTopic(TOPIC2, true, true, KafkaSourceTestEnv::getRecordsForTopic);
}
- @AfterClass
+ @AfterAll
public static void tearDown() throws Exception {
Review Comment:
```suggestion
static void tearDown() throws Exception {
```
package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034899740
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/TransactionToAbortCheckerTest.java:
##########
@@ -17,19 +17,21 @@
package org.apache.flink.connector.kafka.sink;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link TransactionsToAbortChecker}. */
-public class TransactionToAbortCheckerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class TransactionToAbortCheckerTest {
Review Comment:
```suggestion
class TransactionToAbortCheckerTest {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034907187
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberTest.java:
##########
@@ -45,17 +45,19 @@ public class KafkaSubscriberTest {
private static final TopicPartition NON_EXISTING_TOPIC = new TopicPartition("removed", 0);
private static AdminClient adminClient;
- @BeforeClass
+ @BeforeAll
public static void setup() throws Throwable {
KafkaSourceTestEnv.setup();
KafkaSourceTestEnv.createTestTopic(TOPIC1);
KafkaSourceTestEnv.createTestTopic(TOPIC2);
adminClient = KafkaSourceTestEnv.getAdminClient();
}
- @AfterClass
+ @AfterAll
public static void tearDown() throws Exception {
Review Comment:
```suggestion
static void tearDown() throws Exception {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034891596
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommittableSerializerTest.java:
##########
@@ -29,7 +30,8 @@
* Tests for serializing and deserialzing {@link KafkaCommittable} with {@link
* KafkaCommittableSerializer}.
*/
-public class KafkaCommittableSerializerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
Review Comment:
Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034904521
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -166,7 +167,8 @@ public void testReaderRegistrationTriggersAssignments() throws Throwable {
}
}
- @Test(timeout = 30000L)
+ @Test
+ @Timeout(30000L)
public void testDiscoverPartitionsPeriodically() throws Throwable {
Review Comment:
```suggestion
void testDiscoverPartitionsPeriodically() throws Throwable {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034906144
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -145,10 +148,14 @@ public void testSpecificOffsetsInitializer() {
}
}
- @Test(expected = IllegalStateException.class)
+ @Test
public void testSpecifiedOffsetsInitializerWithoutOffsetResetStrategy() {
Review Comment:
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1275766548
Thanks for your contribution @ashmeet-kandhari
As I mentioned in jira issue we could continue code related discussions here
Could you please share more detail/link to ci with failure you've mentioned?
also about changes: i think changes of `docs/themes/book` are not necessary for this PR and could be removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1323964883
or just cherry-picked commits from this PR to rebased version
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1578500753
Hi @ashmeet-kandhari I wonder whether you going to continue working on this PR or not?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1310983666
Hi @snuyanzin
Can you re-check this PR once
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1005622582
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java:
##########
@@ -61,7 +59,7 @@
/** Simple End to End Test for Kafka. */
public class KafkaShuffleITCase extends KafkaShuffleTestBase {
- @Rule public final Timeout timeout = Timeout.millis(600000L);
+ // @RegisterExtension public final Timeout timeout = Timeout.millis(600000L);
Review Comment:
Same here timeout should be migrated as well to junit5
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1044911351
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceLegacyITCase.java:
##########
@@ -37,7 +37,7 @@ public KafkaSourceLegacyITCase() throws Exception {
super(true);
}
- @BeforeClass
+ @BeforeAll
public static void prepare() throws Exception {
Review Comment:
this method calls KafkaProducerTestBase.prepare();
Which is needs to be protected method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047583788
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testPartitionsForAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.partitionsFor("Topic");
+ assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testInitTransactionsAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testInitTransactionsAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.initTransactions();
+ assertThatThrownBy(kafkaProducer::initTransactions)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testBeginTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testBeginTransactionAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.initTransactions();
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.beginTransaction();
+ assertThatThrownBy(kafkaProducer::beginTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testCommitTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testCommitTransactionAfterClosed() {
String topicName = "testCommitTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
- kafkaProducer.commitTransaction();
+ assertThatThrownBy(kafkaProducer::commitTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testResumeTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testResumeTransactionAfterClosed() {
String topicName = "testAbortTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
- kafkaProducer.resumeTransaction(0L, (short) 1);
+ assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testAbortTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ // TODO verify why 'The producer 951754235 has already been closed' is coming
+ @Disabled
Review Comment:
While running this test I was getting the error as mentioned in the TODO, so disabled it for now to discuss about it
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testPartitionsForAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.partitionsFor("Topic");
+ assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testInitTransactionsAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testInitTransactionsAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.initTransactions();
+ assertThatThrownBy(kafkaProducer::initTransactions)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testBeginTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testBeginTransactionAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.initTransactions();
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.beginTransaction();
+ assertThatThrownBy(kafkaProducer::beginTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testCommitTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testCommitTransactionAfterClosed() {
String topicName = "testCommitTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
- kafkaProducer.commitTransaction();
+ assertThatThrownBy(kafkaProducer::commitTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testResumeTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testResumeTransactionAfterClosed() {
String topicName = "testAbortTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
- kafkaProducer.resumeTransaction(0L, (short) 1);
+ assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testAbortTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ // TODO verify why 'The producer 951754235 has already been closed' is coming
+ @Disabled
+ void testAbortTransactionAfterClosed() {
String topicName = "testAbortTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
kafkaProducer.abortTransaction();
- kafkaProducer.resumeTransaction(0L, (short) 1);
+ assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testFlushAfterClosed() {
+ @Test
+ @Timeout(30L)
Review Comment:
By default it takes seconds, do we still need to specify?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1330839374
hi @ashmeet-kandhari
thanks for conflicts resolution
I left some comments (i didn't go through the whole changeset)
however there are several types of things which would be nice to address
1. Since junit5 classes and tests method could be package private (may be except some cases with hierarchies). So would nice to have it fixed
2. Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here than using annotation `@ExtendWith(TestLoggerExtension.class)` for every class.
3. in junit5 timeout unit by default is seconds while in junit4 it is millis. It looks like this should be taken into account or may be better set it explicitly like `@Timeout(value = 30_000, unit = TimeUnit.MILLISECONDS)`
4. I noticed there are tests failing would be nice to understand if it is result of changes within this PR or something else
5. GitHub still shows changes for `docs/themes/book` in this PR. It should be reverted since it has nothing to do with kafka-connector to junit5 migration
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034900090
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceLegacyITCase.java:
##########
@@ -37,7 +37,7 @@ public KafkaSourceLegacyITCase() throws Exception {
super(true);
}
- @BeforeClass
+ @BeforeAll
public static void prepare() throws Exception {
Review Comment:
```suggestion
static void prepare() throws Exception {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034896581
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -158,7 +167,7 @@ public void setUp() throws ExecutionException, InterruptedException, TimeoutExce
createTestTopic(topic, 1, TOPIC_REPLICATION_FACTOR);
}
- @After
+ @AfterEach
public void tearDown() throws ExecutionException, InterruptedException, TimeoutException {
Review Comment:
```suggestion
void tearDown() throws ExecutionException, InterruptedException, TimeoutException {
```
could be package private
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java:
##########
@@ -49,19 +51,21 @@
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link KafkaTransactionLog} to retrieve abortable Kafka transactions. */
-public class KafkaTransactionLogITCase extends TestLogger {
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
Review Comment:
Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034897981
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##########
@@ -77,6 +77,7 @@
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for the standalone KafkaWriter. */
+@Testcontainers
@ExtendWith(TestLoggerExtension.class)
public class KafkaWriterITCase {
Review Comment:
```suggestion
class KafkaWriterITCase {
```
could be package private
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializerTest.java:
##########
@@ -29,7 +30,8 @@
* Tests for serializing and deserialzing {@link KafkaWriterState} with {@link
* KafkaWriterStateSerializer}.
*/
-public class KafkaWriterStateSerializerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
Review Comment:
Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034897140
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java:
##########
@@ -49,19 +51,21 @@
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link KafkaTransactionLog} to retrieve abortable Kafka transactions. */
-public class KafkaTransactionLogITCase extends TestLogger {
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaTransactionLogITCase {
Review Comment:
```suggestion
class KafkaTransactionLogITCase {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034892520
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java:
##########
@@ -45,15 +46,16 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link KafkaRecordSerializationSchemaBuilder}. */
-public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaRecordSerializationSchemaBuilderTest {
Review Comment:
```suggestion
class KafkaRecordSerializationSchemaBuilderTest {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1323917212
it means that there are some new files in your PR without license...
I guess there should be no any new files within this PR.
A list of new files you can find in already mentioned `D:\Projects\Intellij\git\flink\target\rat.txt `
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034900636
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceLegacyITCase.java:
##########
@@ -91,7 +91,7 @@ public void testMultipleSourcesOnePartition() throws Exception {
// --- broker failure ---
@Test
- @Ignore("FLINK-28267")
+ @Disabled("FLINK-28267")
public void testBrokerFailure() throws Exception {
Review Comment:
```suggestion
void testBrokerFailure() throws Exception {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034905020
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -25,24 +25,25 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Unit tests for {@link OffsetsInitializer}. */
public class OffsetsInitializerTest {
Review Comment:
```suggestion
class OffsetsInitializerTest {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1323938824
Doesn't show any diff,
I tried checking the history as well and shows nothing
![image](https://user-images.githubusercontent.com/37675804/203368442-7e42d3cb-8e2c-4a49-a195-aec35b66da7c.png)
Maybe I will need to cancel merge and try again?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1272375576
<!--
Meta data
{
"version" : 1,
"metaDataEntries" : [ {
"hash" : "ff2e4a8ff3a6444267e2a2c9f03a4e3a057e9619",
"status" : "UNKNOWN",
"url" : "TBD",
"triggerID" : "ff2e4a8ff3a6444267e2a2c9f03a4e3a057e9619",
"triggerType" : "PUSH"
} ]
}-->
## CI report:
* ff2e4a8ff3a6444267e2a2c9f03a4e3a057e9619 UNKNOWN
<details>
<summary>Bot commands</summary>
The @flinkbot bot supports the following commands:
- `@flinkbot run azure` re-run the last Azure build
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1354073289
Hi @snuyanzin ,
I have given you access to my repo.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047271926
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testPartitionsForAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.partitionsFor("Topic");
+ assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testInitTransactionsAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testInitTransactionsAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.initTransactions();
+ assertThatThrownBy(kafkaProducer::initTransactions)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testBeginTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testBeginTransactionAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.initTransactions();
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.beginTransaction();
+ assertThatThrownBy(kafkaProducer::beginTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testCommitTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testCommitTransactionAfterClosed() {
String topicName = "testCommitTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
- kafkaProducer.commitTransaction();
+ assertThatThrownBy(kafkaProducer::commitTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testResumeTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testResumeTransactionAfterClosed() {
String topicName = "testAbortTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
- kafkaProducer.resumeTransaction(0L, (short) 1);
+ assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testAbortTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ // TODO verify why 'The producer 951754235 has already been closed' is coming
+ @Disabled
+ void testAbortTransactionAfterClosed() {
String topicName = "testAbortTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
kafkaProducer.abortTransaction();
- kafkaProducer.resumeTransaction(0L, (short) 1);
+ assertThatThrownBy(() -> kafkaProducer.resumeTransaction(0L, (short) 1))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testFlushAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testFlushAfterClosed() {
String topicName = "testCommitTransactionAfterClosed";
FlinkKafkaInternalProducer<String, String> kafkaProducer = getClosedProducer(topicName);
- kafkaProducer.flush();
+ assertThatThrownBy(kafkaProducer::flush).isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L)
- public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception {
+ @Test
+ @Timeout(30L)
Review Comment:
time unit should be specified
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047272871
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java:
##########
@@ -68,7 +68,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
protected KeyedSerializationSchema<Integer> integerKeyedSerializationSchema =
new KeyedSerializationSchemaWrapper<>(integerSerializationSchema);
- @Before
+ @BeforeEach
public void before() {
Review Comment:
could be package private i guess
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java:
##########
@@ -171,7 +171,7 @@ public void testRestoreToCheckpointAfterExceedingProducersPool() throws Exceptio
/** This test hangs when running it in your IDE. */
@Test
- @Ignore
+ @Disabled
public void testFlinkKafkaProducerFailBeforeNotify() throws Exception {
Review Comment:
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1345624195
> hi @ashmeet-kandhari thanks for conflicts resolution I left some comments (i didn't go through the whole changeset) however there are several types of things which would be nice to address
>
> 1. Since junit5 classes and tests method could be package private (may be except some cases with hierarchies). So would nice to have it fixed
> 2. Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here than using annotation `@ExtendWith(TestLoggerExtension.class)` for every class.
> 3. in junit5 timeout unit by default is seconds while in junit4 it is millis. It looks like this should be taken into account or may be better set it explicitly like `@Timeout(value = 30_000, unit = TimeUnit.MILLISECONDS)`
> 4. I noticed there are tests failing would be nice to understand if it is result of changes within this PR or something else
> 5. GitHub still shows changes for `docs/themes/book` in this PR. It should be reverted since it has nothing to do with kafka-connector to junit5 migration
Hi @snuyanzin
I have addressed the first 3 points from the above comments.
Regarding the rest of the comments
* _4th Comment:_ The from what I could see the tests failed for 2 reasons
1. Compilation failed and suggestion was to run `mvn spotless:apply`. But I always run it before committing it. I did test now with the new changes made locally and the tests run fine. So we can see if this comes up again
2. `docs_404_checks` stage failed with the error mentioned below. Not sure what this means as I haven't changed anything there
> Error: Error building site: "/home/vsts/work/1/s/docs/content.zh/_index.md:36:1": failed to extract shortcode: template for shortcode "columns" not found
* _5th Comment:_ I might need some help on how to revert the change for that. As I cannot see anything in intellij that points to changes made in that folder
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [PR] [FLINK-25538][flink-connector-kafka] JUnit5 Migration [flink]
Posted by "MartijnVisser (via GitHub)" <gi...@apache.org>.
MartijnVisser closed pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
URL: https://github.com/apache/flink/pull/20991
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1006434961
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java:
##########
@@ -38,16 +39,16 @@
* <p>For regenerating the binary snapshot files run {@link #writeSnapshot()} on the corresponding
* Flink release-* branch.
*/
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
public class FlinkKafkaProducerMigrationTest extends KafkaMigrationTestBase {
- @Parameterized.Parameters(name = "Migration Savepoint: {0}")
+ @Parameters(name = "Migration Savepoint: {0}")
public static Collection<FlinkVersion> parameters() {
return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_15);
}
- public FlinkKafkaProducerMigrationTest(FlinkVersion testMigrateVersion) {
- super(testMigrateVersion);
- }
+ // public FlinkKafkaProducerMigrationTest(FlinkVersion testMigrateVersion) {
+ // super(testMigrateVersion);
+ // }
Review Comment:
I wanted to check before I removed it.
Do you think we still need the constructor after migration to junit 5 or can we remove it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1006431695
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java:
##########
@@ -79,7 +79,8 @@
*/
@SuppressWarnings("serial")
@RetryOnFailure(times = 3)
Review Comment:
Hi @snuyanzin
After going through the `RetryOnException` java docs it also mentions the same thing
> Annotation to use with {@link org.apache.flink.testutils.junit.RetryRule}.
>
> <p>Add the {@link org.apache.flink.testutils.junit.RetryRule} to your test class and annotate the
> class and/or tests with {@link RetryOnException}.
I also went through the `RetryExtension` source code and looks like it reads from both the annotations (RetryOnException, RetryOnFailure)
Do you still think it makes sense to migrate to `RetryOnException` annotation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1005612119
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleExactlyOnceITCase.java:
##########
@@ -39,7 +37,7 @@
/** Failure Recovery IT Test for KafkaShuffle. */
public class KafkaShuffleExactlyOnceITCase extends KafkaShuffleTestBase {
- @Rule public final Timeout timeout = Timeout.millis(600000L);
+ // @RegisterExtension public final Timeout timeout = Timeout.millis(600000L);
Review Comment:
Well to have same behavior for junit5 timeout should be propagated to all the tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1005624592
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java:
##########
@@ -38,16 +39,16 @@
* <p>For regenerating the binary snapshot files run {@link #writeSnapshot()} on the corresponding
* Flink release-* branch.
*/
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
public class FlinkKafkaProducerMigrationTest extends KafkaMigrationTestBase {
- @Parameterized.Parameters(name = "Migration Savepoint: {0}")
+ @Parameters(name = "Migration Savepoint: {0}")
public static Collection<FlinkVersion> parameters() {
return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_15);
}
- public FlinkKafkaProducerMigrationTest(FlinkVersion testMigrateVersion) {
- super(testMigrateVersion);
- }
+ // public FlinkKafkaProducerMigrationTest(FlinkVersion testMigrateVersion) {
+ // super(testMigrateVersion);
+ // }
Review Comment:
do we really need to have it commented in repo?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1005629556
##########
flink-connectors/flink-connector-kafka/archunit-violations/97dda445-f6bc-43e2-8106-5876ca0cd052:
##########
@@ -26,12 +26,6 @@ org.apache.flink.connector.kafka.source.KafkaSourceITCase does not satisfy: only
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
Review Comment:
you can just remove a corresponding git commit and it should be enough
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034905365
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -25,24 +25,25 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Unit tests for {@link OffsetsInitializer}. */
public class OffsetsInitializerTest {
private static final String TOPIC = "topic";
private static final String TOPIC2 = "topic2";
private static KafkaSourceEnumerator.PartitionOffsetsRetrieverImpl retriever;
- @BeforeClass
+ @BeforeAll
public static void setup() throws Throwable {
Review Comment:
```suggestion
static void setup() throws Throwable {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034891884
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommittableSerializerTest.java:
##########
@@ -29,7 +30,8 @@
* Tests for serializing and deserialzing {@link KafkaCommittable} with {@link
* KafkaCommittableSerializer}.
*/
-public class KafkaCommittableSerializerTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+public class KafkaCommittableSerializerTest {
Review Comment:
```suggestion
class KafkaCommittableSerializerTest {
```
could be package private
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java:
##########
@@ -45,15 +46,16 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link KafkaRecordSerializationSchemaBuilder}. */
-public class KafkaRecordSerializationSchemaBuilderTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
Review Comment:
Using the `META-INF/services/org.junit.jupiter.api.extension.Extension` is the better approach here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1034895369
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -123,18 +130,19 @@ public class KafkaSinkITCase extends TestLogger {
private SharedReference<AtomicBoolean> failed;
private SharedReference<AtomicLong> lastCheckpointedRecord;
- @ClassRule
+ @Container
public static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG)
.withEmbeddedZookeeper()
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
- @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+ @RegisterExtension
+ public final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
- @Rule public final TemporaryFolder temp = new TemporaryFolder();
+ @TempDir public Path temp;
- @BeforeClass
+ @BeforeAll
public static void setupAdmin() {
Review Comment:
```suggestion
static void setupAdmin() {
```
could be package private
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1342895879
Hi @snuyanzin,
I am on vacation and didn't have access to my laptop.
Will address the comments by this weekend.
Thanks for waiting :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047578778
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -428,13 +434,13 @@ private void createTestTopic(String topic, int numPartitions, short replicationF
admin.createTopics(
Collections.singletonList(
new NewTopic(topic, numPartitions, replicationFactor)));
- result.all().get();
+ result.all().get(1, TimeUnit.MINUTES);
Review Comment:
Don't remember exactly why I added tha
will remove it for now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on a diff in pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on code in PR #20991:
URL: https://github.com/apache/flink/pull/20991#discussion_r1047582360
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -148,62 +153,78 @@ public void testResumeTransaction() throws Exception {
deleteTestTopic(topicName);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testPartitionsForAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testPartitionsForAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.partitionsFor("Topic");
+ assertThatThrownBy(() -> kafkaProducer.partitionsFor("Topic"))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testInitTransactionsAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testInitTransactionsAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.initTransactions();
+ assertThatThrownBy(kafkaProducer::initTransactions)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testBeginTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
+ void testBeginTransactionAfterClosed() {
FlinkKafkaInternalProducer<String, String> kafkaProducer =
new FlinkKafkaInternalProducer<>(extraProperties);
kafkaProducer.initTransactions();
kafkaProducer.close(Duration.ofSeconds(5));
- kafkaProducer.beginTransaction();
+ assertThatThrownBy(kafkaProducer::beginTransaction)
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(timeout = 30000L, expected = IllegalStateException.class)
- public void testCommitTransactionAfterClosed() {
+ @Test
+ @Timeout(30L)
Review Comment:
By default it takes seconds, do we still need to specify?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
ashmeet-kandhari commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1354074250
Hi @snuyanzin
I am recently encountering issue, where I am not able to run tests.
Any idea about this
> [ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test (default-test) on project flink-connector-kafka: Execution default-test of goal org.apache.m
aven.plugins:maven-surefire-plugin:3.0.0-M5:test failed: org.junit.platform.commons.JUnitException: TestEngine with ID 'archunit' failed to discover tests: com.tngtech.archunit.library
.freeze.FreezingArchRule.allowEmptyShould(Z)Lcom/tngtech/archunit/lang/ArchRule; -> [Help 1]
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [flink] snuyanzin commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #20991:
URL: https://github.com/apache/flink/pull/20991#issuecomment-1354510303
I see lots of different issues and it's quite tricky to debug all of them...
I would suggest to get back to the stage when all the tests are passing e.g. i pick first commit from this PR and rebased to latest master currently it is in my branch (https://github.com/snuyanzin/flink/commit/57cd7804fdd88666e7dfab8049549a6d739ec505). With others there are already issues.
The proposal is pick this commit and go step by step (or test by test) till the first error and then first fix the error (all the tests should be green). Then can continue with the next test
what do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org