You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/12/23 03:05:03 UTC

[flink-connector-jdbc] branch main updated: [FLINK-27940][tests] Migrate tests to junit5

This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ec3873  [FLINK-27940][tests] Migrate tests to junit5
5ec3873 is described below

commit 5ec38735e5ac2b12620e91ce4977b805fb04c5b8
Author: Joao Boto <bo...@boto.pro>
AuthorDate: Fri Dec 23 04:04:58 2022 +0100

    [FLINK-27940][tests] Migrate tests to junit5
    
    This closes #12
---
 .../connector/jdbc/JdbcConnectionOptionsTest.java  |  48 ++---
 .../flink/connector/jdbc/JdbcDataTestBase.java     |   6 +-
 .../flink/connector/jdbc/JdbcDataTypeTest.java     |  18 +-
 .../apache/flink/connector/jdbc/JdbcITCase.java    |   6 +-
 .../flink/connector/jdbc/JdbcInputFormatTest.java  | 197 ++++++++++++---------
 .../connector/jdbc/JdbcRowOutputFormatTest.java    |  45 +++--
 .../apache/flink/connector/jdbc/JdbcTestBase.java  |   8 +-
 .../jdbc/catalog/JdbcCatalogUtilsTest.java         |  20 ++-
 .../connector/jdbc/catalog/MySqlCatalogITCase.java |  86 ++++-----
 .../jdbc/catalog/MySqlCatalogTestBase.java         |  16 +-
 .../jdbc/catalog/PostgresCatalogITCase.java        |  28 +--
 .../jdbc/catalog/PostgresCatalogTest.java          |  76 ++++----
 .../jdbc/catalog/PostgresCatalogTestBase.java      |  20 +--
 .../jdbc/catalog/PostgresTablePathTest.java        |   6 +-
 .../catalog/factory/JdbcCatalogFactoryTest.java    |  20 ++-
 .../converter/AbstractJdbcRowConverterTest.java    |   6 +-
 .../jdbc/dialect/mysql/MySqlDialectTest.java       |   2 +-
 .../oracle/OraclePreparedStatementTest.java        |  16 +-
 .../jdbc/dialect/oracle/OracleTableSinkITCase.java |  28 +--
 .../dialect/oracle/OracleTableSourceITCase.java    |  28 +--
 .../sqlserver/SqlServerTableSinkITCase.java        |  36 ++--
 .../sqlserver/SqlServerTableSourceITCase.java      |  28 +--
 .../connector/jdbc/internal/JdbcFullTest.java      |  16 +-
 .../jdbc/internal/JdbcTableOutputFormatTest.java   |  20 +--
 ...ProviderDriverClassConcurrentLoadingITCase.java |  11 +-
 .../SimpleJdbcConnectionProviderTest.java          |  20 +--
 .../NumericBetweenParametersProviderTest.java      |  16 +-
 .../FieldNamedPreparedStatementImplTest.java       |  16 +-
 .../jdbc/table/JdbcAppendOnlyWriterTest.java       |  81 +++++----
 .../jdbc/table/JdbcDynamicTableFactoryTest.java    |  20 +--
 .../jdbc/table/JdbcDynamicTableSinkITCase.java     |  28 +--
 .../jdbc/table/JdbcDynamicTableSourceITCase.java   |   2 +-
 ...FilterPushdownPreparedStatementVisitorTest.java |  35 ++--
 .../connector/jdbc/table/JdbcLookupTestBase.java   |   6 +-
 .../connector/jdbc/table/JdbcOutputFormatTest.java |  32 ++--
 .../jdbc/table/JdbcRowDataInputFormatTest.java     | 186 ++++++++++---------
 .../jdbc/table/JdbcRowDataLookupFunctionTest.java  |   4 +-
 .../connector/jdbc/table/JdbcTablePlanTest.java    |   2 +-
 .../jdbc/table/UnsignedTypeConversionITCase.java   |  14 +-
 .../connector/jdbc/utils/JdbcTypeUtilTest.java     |   6 +-
 .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java        |  37 ++--
 .../connector/jdbc/xa/JdbcXaFacadeImplTest.java    |   8 +-
 .../connector/jdbc/xa/JdbcXaSinkDerbyTest.java     |  28 +--
 .../flink/connector/jdbc/xa/JdbcXaSinkH2Test.java  |  12 +-
 .../connector/jdbc/xa/JdbcXaSinkMigrationTest.java |  23 +--
 .../jdbc/xa/JdbcXaSinkNoInsertionTest.java         |  10 +-
 .../connector/jdbc/xa/JdbcXaSinkTestBase.java      |  14 +-
 .../jdbc/xa/SemanticXidGeneratorTest.java          |   8 +-
 .../flink/connector/jdbc/xa/XidImplTest.java       |   8 +-
 49 files changed, 740 insertions(+), 667 deletions(-)

diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcConnectionOptionsTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcConnectionOptionsTest.java
index 7419c98..985f05a 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcConnectionOptionsTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcConnectionOptionsTest.java
@@ -19,35 +19,43 @@ package org.apache.flink.connector.jdbc;
 
 import org.apache.flink.connector.jdbc.fakedb.FakeDBUtils;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link JdbcConnectionOptions}. */
-public class JdbcConnectionOptionsTest {
-    @Test(expected = NullPointerException.class)
-    public void testNullUrl() {
-        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
-                .withUrl(null)
-                .withUsername("user")
-                .withPassword("password")
-                .withDriverName(FakeDBUtils.DRIVER1_CLASS_NAME)
-                .build();
+class JdbcConnectionOptionsTest {
+    @Test
+    void testNullUrl() {
+        assertThatThrownBy(
+                        () ->
+                                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+                                        .withUrl(null)
+                                        .withUsername("user")
+                                        .withPassword("password")
+                                        .withDriverName(FakeDBUtils.DRIVER1_CLASS_NAME)
+                                        .build())
+                .isInstanceOf(NullPointerException.class);
     }
 
     @Test
-    public void testNoOptionalOptions() {
+    void testNoOptionalOptions() {
         new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                 .withUrl(FakeDBUtils.TEST_DB_URL)
                 .build();
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testInvalidCheckTimeoutSeconds() {
-        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
-                .withUrl(FakeDBUtils.TEST_DB_URL)
-                .withUsername("user")
-                .withPassword("password")
-                .withDriverName(FakeDBUtils.DRIVER1_CLASS_NAME)
-                .withConnectionCheckTimeoutSeconds(0)
-                .build();
+    @Test
+    void testInvalidCheckTimeoutSeconds() {
+        assertThatThrownBy(
+                        () ->
+                                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+                                        .withUrl(FakeDBUtils.TEST_DB_URL)
+                                        .withUsername("user")
+                                        .withPassword("password")
+                                        .withDriverName(FakeDBUtils.DRIVER1_CLASS_NAME)
+                                        .withConnectionCheckTimeoutSeconds(0)
+                                        .build())
+                .isInstanceOf(IllegalArgumentException.class);
     }
 }
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
index 2eb0de7..003cf57 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.types.Row;
 
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
 import org.mockito.Mockito;
 
 import java.sql.SQLException;
@@ -38,8 +38,8 @@ import static org.mockito.Mockito.doReturn;
  * and inserts data before each test.
  */
 public abstract class JdbcDataTestBase extends JdbcTestBase {
-    @Before
-    public void initData() throws SQLException {
+    @BeforeEach
+    void initData() throws SQLException {
         JdbcTestFixture.initData(getDbMetadata());
     }
 
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java
index d3c35c5..75be19a 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java
@@ -20,10 +20,12 @@ package org.apache.flink.connector.jdbc;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import javax.annotation.Nullable;
 
@@ -35,7 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for all DataTypes and Dialects of JDBC connector. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class JdbcDataTypeTest {
 
     private static final String DDL_FORMAT =
@@ -49,7 +51,7 @@ public class JdbcDataTypeTest {
                     + "  'table-name'='myTable'\n"
                     + ")";
 
-    @Parameterized.Parameters(name = "{index}: {0}")
+    @Parameters(name = "{0}")
     public static List<TestItem> testData() {
         return Arrays.asList(
                 createTestItem("derby", "CHAR"),
@@ -178,10 +180,10 @@ public class JdbcDataTypeTest {
         return item;
     }
 
-    @Parameterized.Parameter public TestItem testItem;
+    @Parameter public TestItem testItem;
 
-    @Test
-    public void testDataTypeValidate() {
+    @TestTemplate
+    void testDataTypeValidate() {
         String sqlDDL = String.format(DDL_FORMAT, testItem.dataTypeExpr, testItem.dialect);
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
index c24fc56..d1a830b 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.function.FunctionWithException;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.Serializable;
 import java.sql.Connection;
@@ -63,7 +63,7 @@ public class JdbcITCase extends JdbcTestBase {
             };
 
     @Test
-    public void testInsert() throws Exception {
+    void testInsert() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
         env.setParallelism(1);
@@ -82,7 +82,7 @@ public class JdbcITCase extends JdbcTestBase {
     }
 
     @Test
-    public void testObjectReuse() throws Exception {
+    void testObjectReuse() throws Exception {
         Configuration configuration = new Configuration();
         configuration.set(OBJECT_REUSE, true);
         StreamExecutionEnvironment env =
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
index 197fb63..92c0e79 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
@@ -24,10 +24,8 @@ import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.types.Row;
 
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -44,16 +42,15 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.SELECT_EMPTY;
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link JdbcInputFormat}. */
-public class JdbcInputFormatTest extends JdbcDataTestBase {
-
-    @Rule public ExpectedException thrown = ExpectedException.none();
+class JdbcInputFormatTest extends JdbcDataTestBase {
 
     private JdbcInputFormat jdbcInputFormat;
 
-    @After
-    public void tearDown() throws IOException {
+    @AfterEach
+    void tearDown() throws IOException {
         if (jdbcInputFormat != null) {
             jdbcInputFormat.close();
             jdbcInputFormat.closeInputFormat();
@@ -62,92 +59,117 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testUntypedRowInfo() throws IOException {
-        thrown.expect(NullPointerException.class);
-        thrown.expectMessage("No RowTypeInfo supplied");
-        jdbcInputFormat =
-                JdbcInputFormat.buildJdbcInputFormat()
-                        .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
-                        .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
-                        .setQuery(SELECT_ALL_BOOKS)
-                        .finish();
-        jdbcInputFormat.openInputFormat();
+    void testUntypedRowInfo() {
+        assertThatThrownBy(
+                        () -> {
+                            jdbcInputFormat =
+                                    JdbcInputFormat.buildJdbcInputFormat()
+                                            .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+                                            .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+                                            .setQuery(SELECT_ALL_BOOKS)
+                                            .finish();
+                            jdbcInputFormat.openInputFormat();
+                        })
+                .isInstanceOf(NullPointerException.class)
+                .hasMessage("No RowTypeInfo supplied");
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testInvalidDriver() throws IOException {
-        jdbcInputFormat =
-                JdbcInputFormat.buildJdbcInputFormat()
-                        .setDrivername("org.apache.derby.jdbc.idontexist")
-                        .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
-                        .setQuery(SELECT_ALL_BOOKS)
-                        .setRowTypeInfo(ROW_TYPE_INFO)
-                        .finish();
-        jdbcInputFormat.openInputFormat();
+    @Test
+    void testInvalidDriver() {
+        assertThatThrownBy(
+                        () -> {
+                            jdbcInputFormat =
+                                    JdbcInputFormat.buildJdbcInputFormat()
+                                            .setDrivername("org.apache.derby.jdbc.idontexist")
+                                            .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+                                            .setQuery(SELECT_ALL_BOOKS)
+                                            .setRowTypeInfo(ROW_TYPE_INFO)
+                                            .finish();
+                            jdbcInputFormat.openInputFormat();
+                        })
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testInvalidURL() throws IOException {
-        jdbcInputFormat =
-                JdbcInputFormat.buildJdbcInputFormat()
-                        .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
-                        .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
-                        .setQuery(SELECT_ALL_BOOKS)
-                        .setRowTypeInfo(ROW_TYPE_INFO)
-                        .finish();
-        jdbcInputFormat.openInputFormat();
+    @Test
+    void testInvalidURL() {
+        assertThatThrownBy(
+                        () -> {
+                            jdbcInputFormat =
+                                    JdbcInputFormat.buildJdbcInputFormat()
+                                            .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+                                            .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
+                                            .setQuery(SELECT_ALL_BOOKS)
+                                            .setRowTypeInfo(ROW_TYPE_INFO)
+                                            .finish();
+                            jdbcInputFormat.openInputFormat();
+                        })
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testInvalidQuery() throws IOException {
-        jdbcInputFormat =
-                JdbcInputFormat.buildJdbcInputFormat()
-                        .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
-                        .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
-                        .setQuery("iamnotsql")
-                        .setRowTypeInfo(ROW_TYPE_INFO)
-                        .finish();
-        jdbcInputFormat.openInputFormat();
+    @Test
+    void testInvalidQuery() {
+        assertThatThrownBy(
+                        () -> {
+                            jdbcInputFormat =
+                                    JdbcInputFormat.buildJdbcInputFormat()
+                                            .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+                                            .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+                                            .setQuery("iamnotsql")
+                                            .setRowTypeInfo(ROW_TYPE_INFO)
+                                            .finish();
+                            jdbcInputFormat.openInputFormat();
+                        })
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testNoUrl() throws IOException {
-        thrown.expect(NullPointerException.class);
-        thrown.expectMessage("jdbc url is empty");
-        jdbcInputFormat =
-                JdbcInputFormat.buildJdbcInputFormat()
-                        .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
-                        .setQuery(SELECT_ALL_BOOKS)
-                        .setRowTypeInfo(ROW_TYPE_INFO)
-                        .finish();
+    void testNoUrl() {
+        assertThatThrownBy(
+                        () -> {
+                            jdbcInputFormat =
+                                    JdbcInputFormat.buildJdbcInputFormat()
+                                            .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+                                            .setQuery(SELECT_ALL_BOOKS)
+                                            .setRowTypeInfo(ROW_TYPE_INFO)
+                                            .finish();
+                        })
+                .isInstanceOf(NullPointerException.class)
+                .hasMessage("jdbc url is empty");
     }
 
     @Test
-    public void testNoQuery() throws IOException {
-        thrown.expect(NullPointerException.class);
-        thrown.expectMessage("No query supplied");
-        jdbcInputFormat =
-                JdbcInputFormat.buildJdbcInputFormat()
-                        .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
-                        .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
-                        .setRowTypeInfo(ROW_TYPE_INFO)
-                        .finish();
+    void testNoQuery() {
+        assertThatThrownBy(
+                        () -> {
+                            jdbcInputFormat =
+                                    JdbcInputFormat.buildJdbcInputFormat()
+                                            .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+                                            .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+                                            .setRowTypeInfo(ROW_TYPE_INFO)
+                                            .finish();
+                        })
+                .isInstanceOf(NullPointerException.class)
+                .hasMessage("No query supplied");
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testInvalidFetchSize() {
-        jdbcInputFormat =
-                JdbcInputFormat.buildJdbcInputFormat()
-                        .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
-                        .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
-                        .setQuery(SELECT_ALL_BOOKS)
-                        .setRowTypeInfo(ROW_TYPE_INFO)
-                        .setFetchSize(-7)
-                        .finish();
+    @Test
+    void testInvalidFetchSize() {
+        assertThatThrownBy(
+                        () -> {
+                            jdbcInputFormat =
+                                    JdbcInputFormat.buildJdbcInputFormat()
+                                            .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+                                            .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+                                            .setQuery(SELECT_ALL_BOOKS)
+                                            .setRowTypeInfo(ROW_TYPE_INFO)
+                                            .setFetchSize(-7)
+                                            .finish();
+                        })
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testValidFetchSizeIntegerMin() {
+    void testValidFetchSizeIntegerMin() {
         jdbcInputFormat =
                 JdbcInputFormat.buildJdbcInputFormat()
                         .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -159,7 +181,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise()
+    void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise()
             throws SQLException, ClassNotFoundException {
         jdbcInputFormat =
                 JdbcInputFormat.buildJdbcInputFormat()
@@ -180,7 +202,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testFetchSizeCanBeConfigured() throws SQLException {
+    void testFetchSizeCanBeConfigured() throws SQLException {
         final int desiredFetchSize = 10_000;
         jdbcInputFormat =
                 JdbcInputFormat.buildJdbcInputFormat()
@@ -195,7 +217,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testDefaultAutoCommitIsUsedIfNotConfiguredOtherwise()
+    void testDefaultAutoCommitIsUsedIfNotConfiguredOtherwise()
             throws SQLException, ClassNotFoundException {
 
         jdbcInputFormat =
@@ -215,7 +237,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testAutoCommitCanBeConfigured() throws SQLException {
+    void testAutoCommitCanBeConfigured() throws SQLException {
 
         final boolean desiredAutoCommit = false;
         jdbcInputFormat =
@@ -232,7 +254,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testJdbcInputFormatWithoutParallelism() throws IOException {
+    void testJdbcInputFormatWithoutParallelism() throws IOException {
         jdbcInputFormat =
                 JdbcInputFormat.buildJdbcInputFormat()
                         .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -260,7 +282,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testJdbcInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
+    void testJdbcInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
         final int fetchSize = 1;
         final long min = TEST_DATA[0].id;
         final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
@@ -298,8 +320,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testJdbcInputFormatWithoutParallelismAndNumericColumnSplitting()
-            throws IOException {
+    void testJdbcInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException {
         final long min = TEST_DATA[0].id;
         final long max = TEST_DATA[TEST_DATA.length - 1].id;
         final long fetchSize = max + 1; // generate a single split
@@ -337,7 +358,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testJdbcInputFormatWithParallelismAndGenericSplitting() throws IOException {
+    void testJdbcInputFormatWithParallelismAndGenericSplitting() throws IOException {
         Serializable[][] queryParameters = new String[2][1];
         queryParameters[0] = new String[] {TEST_DATA[3].author};
         queryParameters[1] = new String[] {TEST_DATA[0].author};
@@ -383,7 +404,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testEmptyResults() throws IOException {
+    void testEmptyResults() throws IOException {
         jdbcInputFormat =
                 JdbcInputFormat.buildJdbcInputFormat()
                         .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
index 5f21362..3150a15 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.connector.jdbc;
 
 import org.apache.flink.types.Row;
 
-import org.junit.After;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -51,20 +51,26 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link JdbcRowOutputFormat}. */
-public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
+class JdbcRowOutputFormatTest extends JdbcDataTestBase {
 
     private JdbcRowOutputFormat jdbcOutputFormat;
 
-    @After
-    public void tearDown() throws IOException {
+    @AfterEach
+    void tearDown() throws Exception {
         if (jdbcOutputFormat != null) {
             jdbcOutputFormat.close();
         }
         jdbcOutputFormat = null;
+
+        Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
+        try (Connection conn = DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl());
+                Statement stat = conn.createStatement()) {
+            stat.execute("DELETE FROM " + OUTPUT_TABLE);
+        }
     }
 
     @Test
-    public void testInvalidDriver() {
+    void testInvalidDriver() {
         String expectedMsg = "unable to open JDBC writer";
         try {
             jdbcOutputFormat =
@@ -81,7 +87,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testInvalidURL() {
+    void testInvalidURL() {
         String expectedMsg = "No suitable driver found for jdbc:der:iamanerror:mory:ebookshop";
 
         jdbcOutputFormat =
@@ -96,7 +102,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testInvalidQuery() {
+    void testInvalidQuery() {
         String expectedMsg = "unable to open JDBC writer";
         try {
             jdbcOutputFormat =
@@ -114,7 +120,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testIncompleteConfiguration() {
+    void testIncompleteConfiguration() {
         String expectedMsg = "jdbc url is empty";
         try {
             jdbcOutputFormat =
@@ -129,7 +135,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testIncompatibleTypes() {
+    void testIncompatibleTypes() {
         String expectedMsg = "Invalid character string format for type INTEGER.";
         try {
             jdbcOutputFormat =
@@ -157,7 +163,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testExceptionOnInvalidType() {
+    void testExceptionOnInvalidType() {
         String expectedMsg = "field index: 3, field value: 0.";
         try {
             jdbcOutputFormat =
@@ -193,7 +199,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testExceptionOnClose() {
+    void testExceptionOnClose() {
         String expectedMsg = "Writing records to JDBC failed.";
         try {
             jdbcOutputFormat =
@@ -232,7 +238,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testJdbcOutputFormat() throws IOException, SQLException {
+    void testJdbcOutputFormat() throws IOException, SQLException {
         jdbcOutputFormat =
                 JdbcRowOutputFormat.buildJdbcOutputFormat()
                         .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -266,7 +272,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testFlush() throws SQLException, IOException {
+    void testFlush() throws SQLException, IOException {
         jdbcOutputFormat =
                 JdbcRowOutputFormat.buildJdbcOutputFormat()
                         .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -306,7 +312,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testInvalidConnectionInJdbcOutputFormat() throws IOException, SQLException {
+    void testInvalidConnectionInJdbcOutputFormat() throws IOException, SQLException {
         jdbcOutputFormat =
                 JdbcRowOutputFormat.buildJdbcOutputFormat()
                         .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -346,13 +352,4 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
             assertThat(recordCount).isEqualTo(TEST_DATA.length);
         }
     }
-
-    @After
-    public void clearOutputTable() throws Exception {
-        Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
-        try (Connection conn = DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl());
-                Statement stat = conn.createStatement()) {
-            stat.execute("DELETE FROM " + OUTPUT_TABLE);
-        }
-    }
 }
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
index 88daae4..1214a08 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.connector.jdbc;
 
-import org.junit.After;
-import org.junit.Before;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 
 /**
  * Base class for JDBC test using DDL from {@link JdbcTestFixture}. It uses create tables before
@@ -26,12 +26,12 @@ import org.junit.Before;
  */
 public abstract class JdbcTestBase {
 
-    @Before
+    @BeforeEach
     public void before() throws Exception {
         JdbcTestFixture.initSchema(getDbMetadata());
     }
 
-    @After
+    @AfterEach
     public void after() throws Exception {
         JdbcTestFixture.cleanupData(getDbMetadata().getUrl());
         JdbcTestFixture.cleanUpDatabasesStatic(getDbMetadata());
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtilsTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtilsTest.java
index 087009e..2340714 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtilsTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtilsTest.java
@@ -18,24 +18,26 @@
 
 package org.apache.flink.connector.jdbc.catalog;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link JdbcCatalogUtils}. */
-public class JdbcCatalogUtilsTest {
-    @Rule public ExpectedException exception = ExpectedException.none();
+class JdbcCatalogUtilsTest {
 
     @Test
-    public void testJdbcUrl() {
+    void testJdbcUrl() {
         JdbcCatalogUtils.validateJdbcUrl("jdbc:postgresql://localhost:5432/");
 
         JdbcCatalogUtils.validateJdbcUrl("jdbc:postgresql://localhost:5432");
     }
 
     @Test
-    public void testInvalidJdbcUrl() {
-        exception.expect(IllegalArgumentException.class);
-        JdbcCatalogUtils.validateJdbcUrl("jdbc:postgresql://localhost:5432/db");
+    void testInvalidJdbcUrl() {
+        assertThatThrownBy(
+                        () ->
+                                JdbcCatalogUtils.validateJdbcUrl(
+                                        "jdbc:postgresql://localhost:5432/db"))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 }
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
index c0b398e..9d57400 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
@@ -26,22 +26,24 @@ import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CollectionUtil;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -51,7 +53,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** E2E test for {@link MySqlCatalog}. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class MySqlCatalogITCase extends MySqlCatalogTestBase {
 
     private static final List<Row> ALL_TYPES_ROWS =
@@ -154,8 +156,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
     private final MySqlCatalog catalog;
     private TableEnvironment tEnv;
 
-    @Before
-    public void setup() {
+    @BeforeEach
+    void setup() {
         this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
         tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
 
@@ -168,15 +170,15 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
         catalog = CATALOGS.get(version);
     }
 
-    @Parameterized.Parameters(name = "version = {0}")
-    public static String[] params() {
-        return DOCKER_IMAGE_NAMES.toArray(new String[0]);
+    @Parameters(name = "version = {0}")
+    public static Collection<String> params() {
+        return DOCKER_IMAGE_NAMES;
     }
 
     // ------ databases ------
 
-    @Test
-    public void testGetDb_DatabaseNotExistException() throws Exception {
+    @TestTemplate
+    void testGetDb_DatabaseNotExistException() throws Exception {
         String databaseNotExist = "nonexistent";
         assertThatThrownBy(() -> catalog.getDatabase(databaseNotExist))
                 .satisfies(
@@ -187,14 +189,14 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
                                         databaseNotExist)));
     }
 
-    @Test
-    public void testListDatabases() {
+    @TestTemplate
+    void testListDatabases() {
         List<String> actual = catalog.listDatabases();
         assertThat(actual).containsExactly(TEST_DB, TEST_DB2);
     }
 
-    @Test
-    public void testDbExists() throws Exception {
+    @TestTemplate
+    void testDbExists() throws Exception {
         String databaseNotExist = "nonexistent";
         assertThat(catalog.databaseExists(databaseNotExist)).isFalse();
         assertThat(catalog.databaseExists(TEST_DB)).isTrue();
@@ -202,8 +204,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
 
     // ------ tables ------
 
-    @Test
-    public void testListTables() throws DatabaseNotExistException {
+    @TestTemplate
+    void testListTables() throws DatabaseNotExistException {
         List<String> actual = catalog.listTables(TEST_DB);
         assertThat(actual)
                 .isEqualTo(
@@ -214,8 +216,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
                                 TEST_TABLE_PK));
     }
 
-    @Test
-    public void testListTables_DatabaseNotExistException() throws DatabaseNotExistException {
+    @TestTemplate
+    void testListTables_DatabaseNotExistException() throws DatabaseNotExistException {
         String anyDatabase = "anyDatabase";
         assertThatThrownBy(() -> catalog.listTables(anyDatabase))
                 .satisfies(
@@ -225,15 +227,15 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
                                         "Database %s does not exist in Catalog", anyDatabase)));
     }
 
-    @Test
-    public void testTableExists() {
+    @TestTemplate
+    void testTableExists() {
         String tableNotExist = "nonexist";
         assertThat(catalog.tableExists(new ObjectPath(TEST_DB, tableNotExist))).isFalse();
         assertThat(catalog.tableExists(new ObjectPath(TEST_DB, TEST_TABLE_ALL_TYPES))).isTrue();
     }
 
-    @Test
-    public void testGetTables_TableNotExistException() throws TableNotExistException {
+    @TestTemplate
+    void testGetTables_TableNotExistException() throws TableNotExistException {
         String anyTableNotExist = "anyTable";
         assertThatThrownBy(() -> catalog.getTable(new ObjectPath(TEST_DB, anyTableNotExist)))
                 .satisfies(
@@ -244,8 +246,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
                                         TEST_DB, anyTableNotExist)));
     }
 
-    @Test
-    public void testGetTables_TableNotExistException_NoDb() throws TableNotExistException {
+    @TestTemplate
+    void testGetTables_TableNotExistException_NoDb() throws TableNotExistException {
         String databaseNotExist = "nonexistdb";
         String tableNotExist = "anyTable";
         assertThatThrownBy(() -> catalog.getTable(new ObjectPath(databaseNotExist, tableNotExist)))
@@ -257,14 +259,14 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
                                         databaseNotExist, tableNotExist)));
     }
 
-    @Test
-    public void testGetTable() throws TableNotExistException {
+    @TestTemplate
+    void testGetTable() throws TableNotExistException {
         CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB, TEST_TABLE_ALL_TYPES));
         assertThat(table.getUnresolvedSchema()).isEqualTo(TABLE_SCHEMA);
     }
 
-    @Test
-    public void testGetTablePrimaryKey() throws TableNotExistException {
+    @TestTemplate
+    void testGetTablePrimaryKey() throws TableNotExistException {
         // test the PK of test.t_user
         Schema tableSchemaTestPK1 =
                 Schema.newBuilder()
@@ -290,8 +292,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
 
     // ------ test select query. ------
 
-    @Test
-    public void testSelectField() {
+    @TestTemplate
+    void testSelectField() {
         List<Row> results =
                 CollectionUtil.iteratorToList(
                         tEnv.sqlQuery(String.format("select pid from %s", TEST_TABLE_ALL_TYPES))
@@ -303,8 +305,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
                                 Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 2L)));
     }
 
-    @Test
-    public void testWithoutCatalogDB() {
+    @TestTemplate
+    void testWithoutCatalogDB() {
         List<Row> results =
                 CollectionUtil.iteratorToList(
                         tEnv.sqlQuery(String.format("select * from %s", TEST_TABLE_ALL_TYPES))
@@ -314,8 +316,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
         assertThat(results).isEqualTo(ALL_TYPES_ROWS);
     }
 
-    @Test
-    public void testWithoutCatalog() {
+    @TestTemplate
+    void testWithoutCatalog() {
         List<Row> results =
                 CollectionUtil.iteratorToList(
                         tEnv.sqlQuery(
@@ -327,8 +329,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
         assertThat(results).isEqualTo(ALL_TYPES_ROWS);
     }
 
-    @Test
-    public void testFullPath() {
+    @TestTemplate
+    void testFullPath() {
         List<Row> results =
                 CollectionUtil.iteratorToList(
                         tEnv.sqlQuery(
@@ -342,8 +344,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
         assertThat(results).isEqualTo(ALL_TYPES_ROWS);
     }
 
-    @Test
-    public void testSelectToInsert() throws Exception {
+    @TestTemplate
+    void testSelectToInsert() throws Exception {
 
         String sql =
                 String.format(
@@ -359,8 +361,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
         assertThat(results).isEqualTo(ALL_TYPES_ROWS);
     }
 
-    @Test
-    public void testGroupByInsert() throws Exception {
+    @TestTemplate
+    void testGroupByInsert() throws Exception {
         // Changes primary key for the next record.
         tEnv.executeSql(
                         String.format(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
index d92c89e..c58dcf4 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
@@ -23,8 +23,8 @@ import org.apache.flink.table.api.Schema;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.MySQLContainer;
@@ -38,12 +38,12 @@ import java.util.List;
 import java.util.Map;
 
 /** Test base for {@link MySqlCatalog}. */
-public class MySqlCatalogTestBase {
+class MySqlCatalogTestBase {
 
     public static final Logger LOG = LoggerFactory.getLogger(MySqlCatalogTestBase.class);
 
     protected static final List<String> DOCKER_IMAGE_NAMES =
-            Arrays.asList("mysql:5.6.51", "mysql:5.7.34", "mysql:8.0.16");
+            Arrays.asList("mysql:5.6.51", "mysql:5.7.40", "mysql:8.0.31");
     protected static final String TEST_CATALOG_NAME = "mysql_catalog";
     protected static final String TEST_USERNAME = "mysql";
     protected static final String TEST_PWD = "mysql";
@@ -114,8 +114,8 @@ public class MySqlCatalogTestBase {
     public static final Map<String, MySQLContainer<?>> MYSQL_CONTAINERS = new HashMap<>();
     public static final Map<String, MySqlCatalog> CATALOGS = new HashMap<>();
 
-    @BeforeClass
-    public static void beforeAll() throws SQLException {
+    @BeforeAll
+    static void beforeAll() throws SQLException {
         for (String dockerImageName : DOCKER_IMAGE_NAMES) {
             MySQLContainer<?> container =
                     new MySQLContainer<>(DockerImageName.parse(dockerImageName))
@@ -140,8 +140,8 @@ public class MySqlCatalogTestBase {
         }
     }
 
-    @AfterClass
-    public static void cleanup() {
+    @AfterAll
+    static void cleanup() {
         for (MySQLContainer<?> container : MYSQL_CONTAINERS.values()) {
             container.stop();
         }
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
index 78d142d..925f75d 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
@@ -23,8 +23,8 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.List;
 
@@ -33,12 +33,12 @@ import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** E2E test for {@link PostgresCatalog}. */
-public class PostgresCatalogITCase extends PostgresCatalogTestBase {
+class PostgresCatalogITCase extends PostgresCatalogTestBase {
 
     private TableEnvironment tEnv;
 
-    @Before
-    public void setup() {
+    @BeforeEach
+    void setup() {
         this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
         tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
 
@@ -48,7 +48,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
     }
 
     @Test
-    public void testSelectField() {
+    void testSelectField() {
         List<Row> results =
                 CollectionUtil.iteratorToList(
                         tEnv.sqlQuery(String.format("select id from %s", TABLE1))
@@ -58,7 +58,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
     }
 
     @Test
-    public void testWithoutSchema() {
+    void testWithoutSchema() {
         List<Row> results =
                 CollectionUtil.iteratorToList(
                         tEnv.sqlQuery(String.format("select * from %s", TABLE1))
@@ -68,7 +68,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
     }
 
     @Test
-    public void testWithSchema() {
+    void testWithSchema() {
         List<Row> results =
                 CollectionUtil.iteratorToList(
                         tEnv.sqlQuery(
@@ -81,7 +81,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
     }
 
     @Test
-    public void testFullPath() {
+    void testFullPath() {
         List<Row> results =
                 CollectionUtil.iteratorToList(
                         tEnv.sqlQuery(
@@ -96,7 +96,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
     }
 
     @Test
-    public void testInsert() throws Exception {
+    void testInsert() throws Exception {
         tEnv.executeSql(String.format("insert into %s select * from `%s`", TABLE4, TABLE1)).await();
 
         List<Row> results =
@@ -108,7 +108,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
     }
 
     @Test
-    public void testGroupByInsert() throws Exception {
+    void testGroupByInsert() throws Exception {
         tEnv.executeSql(
                         String.format(
                                 "insert into `%s` "
@@ -131,7 +131,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
     }
 
     @Test
-    public void testPrimitiveTypes() {
+    void testPrimitiveTypes() {
         List<Row> results =
                 CollectionUtil.iteratorToList(
                         tEnv.sqlQuery(String.format("select * from %s", TABLE_PRIMITIVE_TYPE))
@@ -144,7 +144,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
     }
 
     @Test
-    public void testArrayTypes() {
+    void testArrayTypes() {
         List<Row> results =
                 CollectionUtil.iteratorToList(
                         tEnv.sqlQuery(String.format("select * from %s", TABLE_ARRAY_TYPE))
@@ -176,7 +176,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
     }
 
     @Test
-    public void testSerialTypes() {
+    void testSerialTypes() {
         List<Row> results =
                 CollectionUtil.iteratorToList(
                         tEnv.sqlQuery(String.format("select * from %s", TABLE_SERIAL_TYPE))
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java
index a5a9419..4286a49 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java
@@ -24,34 +24,35 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link PostgresCatalog}. */
-public class PostgresCatalogTest extends PostgresCatalogTestBase {
+class PostgresCatalogTest extends PostgresCatalogTestBase {
 
     // ------ databases ------
 
     @Test
-    public void testGetDb_DatabaseNotExistException() throws Exception {
-        exception.expect(DatabaseNotExistException.class);
-        exception.expectMessage("Database nonexistent does not exist in Catalog");
-        catalog.getDatabase("nonexistent");
+    void testGetDb_DatabaseNotExistException() {
+        assertThatThrownBy(() -> catalog.getDatabase("nonexistent"))
+                .isInstanceOf(DatabaseNotExistException.class)
+                .hasMessageContaining("Database nonexistent does not exist in Catalog");
     }
 
     @Test
-    public void testListDatabases() {
+    void testListDatabases() {
         List<String> actual = catalog.listDatabases();
 
         assertThat(actual).isEqualTo(Arrays.asList("postgres", "test"));
     }
 
     @Test
-    public void testDbExists() throws Exception {
+    void testDbExists() {
         assertThat(catalog.databaseExists("nonexistent")).isFalse();
 
         assertThat(catalog.databaseExists(PostgresCatalog.DEFAULT_DATABASE)).isTrue();
@@ -60,7 +61,7 @@ public class PostgresCatalogTest extends PostgresCatalogTestBase {
     // ------ tables ------
 
     @Test
-    public void testListTables() throws DatabaseNotExistException {
+    void testListTables() throws DatabaseNotExistException {
         List<String> actual = catalog.listTables(PostgresCatalog.DEFAULT_DATABASE);
 
         assertThat(actual)
@@ -80,13 +81,13 @@ public class PostgresCatalogTest extends PostgresCatalogTestBase {
     }
 
     @Test
-    public void testListTables_DatabaseNotExistException() throws DatabaseNotExistException {
-        exception.expect(DatabaseNotExistException.class);
-        catalog.listTables("postgres/nonexistschema");
+    void testListTables_DatabaseNotExistException() {
+        assertThatThrownBy(() -> catalog.listTables("postgres/nonexistschema"))
+                .isInstanceOf(DatabaseNotExistException.class);
     }
 
     @Test
-    public void testTableExists() {
+    void testTableExists() {
         assertThat(catalog.tableExists(new ObjectPath(TEST_DB, "nonexist"))).isFalse();
 
         assertThat(catalog.tableExists(new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE1)))
@@ -96,32 +97,43 @@ public class PostgresCatalogTest extends PostgresCatalogTestBase {
     }
 
     @Test
-    public void testGetTables_TableNotExistException() throws TableNotExistException {
-        exception.expect(TableNotExistException.class);
-        catalog.getTable(
-                new ObjectPath(
-                        TEST_DB, PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable")));
+    void testGetTables_TableNotExistException() {
+        assertThatThrownBy(
+                        () ->
+                                catalog.getTable(
+                                        new ObjectPath(
+                                                TEST_DB,
+                                                PostgresTablePath.toFlinkTableName(
+                                                        TEST_SCHEMA, "anytable"))))
+                .isInstanceOf(TableNotExistException.class);
     }
 
     @Test
-    public void testGetTables_TableNotExistException_NoSchema() throws TableNotExistException {
-        exception.expect(TableNotExistException.class);
-        catalog.getTable(
-                new ObjectPath(
-                        TEST_DB, PostgresTablePath.toFlinkTableName("nonexistschema", "anytable")));
+    void testGetTables_TableNotExistException_NoSchema() {
+        assertThatThrownBy(
+                        () ->
+                                catalog.getTable(
+                                        new ObjectPath(
+                                                TEST_DB,
+                                                PostgresTablePath.toFlinkTableName(
+                                                        "nonexistschema", "anytable"))))
+                .isInstanceOf(TableNotExistException.class);
     }
 
     @Test
-    public void testGetTables_TableNotExistException_NoDb() throws TableNotExistException {
-        exception.expect(TableNotExistException.class);
-        catalog.getTable(
-                new ObjectPath(
-                        "nonexistdb", PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable")));
+    void testGetTables_TableNotExistException_NoDb() {
+        assertThatThrownBy(
+                        () ->
+                                catalog.getTable(
+                                        new ObjectPath(
+                                                "nonexistdb",
+                                                PostgresTablePath.toFlinkTableName(
+                                                        TEST_SCHEMA, "anytable"))))
+                .isInstanceOf(TableNotExistException.class);
     }
 
     @Test
-    public void testGetTable()
-            throws org.apache.flink.table.catalog.exceptions.TableNotExistException {
+    void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExistException {
         // test postgres.public.user1
         Schema schema = getSimpleTable().schema;
 
@@ -149,7 +161,7 @@ public class PostgresCatalogTest extends PostgresCatalogTestBase {
     }
 
     @Test
-    public void testPrimitiveDataTypes() throws TableNotExistException {
+    void testPrimitiveDataTypes() throws TableNotExistException {
         CatalogBaseTable table =
                 catalog.getTable(
                         new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_PRIMITIVE_TYPE));
@@ -158,7 +170,7 @@ public class PostgresCatalogTest extends PostgresCatalogTestBase {
     }
 
     @Test
-    public void testArrayDataTypes() throws TableNotExistException {
+    void testArrayDataTypes() throws TableNotExistException {
         CatalogBaseTable table =
                 catalog.getTable(
                         new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_ARRAY_TYPE));
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
index 38a423d..2666bbc 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
@@ -23,14 +23,13 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.types.logical.DecimalType;
 
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.BeforeAll;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.PostgreSQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
 import org.testcontainers.utility.DockerImageName;
 
 import java.sql.Connection;
@@ -39,12 +38,11 @@ import java.sql.SQLException;
 import java.sql.Statement;
 
 /** Test base for {@link PostgresCatalog}. */
-public class PostgresCatalogTestBase {
+@Testcontainers
+class PostgresCatalogTestBase {
 
     public static final Logger LOG = LoggerFactory.getLogger(PostgresCatalogTestBase.class);
 
-    @Rule public ExpectedException exception = ExpectedException.none();
-
     protected static final DockerImageName POSTGRES_IMAGE =
             DockerImageName.parse(DockerImageVersions.POSTGRES);
 
@@ -66,15 +64,15 @@ public class PostgresCatalogTestBase {
     protected static String baseUrl;
     protected static PostgresCatalog catalog;
 
-    @ClassRule
-    public static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
+    @Container
+    static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
             new PostgreSQLContainer<>(POSTGRES_IMAGE)
                     .withUsername(TEST_USERNAME)
                     .withPassword(TEST_PWD)
                     .withLogConsumer(new Slf4jLogConsumer(LOG));
 
-    @BeforeClass
-    public static void init() throws SQLException {
+    @BeforeAll
+    static void init() throws SQLException {
         // jdbc:postgresql://localhost:50807/postgres?user=postgres
         String jdbcUrl = POSTGRES_CONTAINER.getJdbcUrl();
         // jdbc:postgresql://localhost:50807/
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePathTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePathTest.java
index db63c34..61314af 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePathTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePathTest.java
@@ -18,14 +18,14 @@
 
 package org.apache.flink.connector.jdbc.catalog;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link PostgresTablePath}. */
-public class PostgresTablePathTest {
+class PostgresTablePathTest {
     @Test
-    public void testFromFlinkTableName() {
+    void testFromFlinkTableName() {
         assertThat(PostgresTablePath.fromFlinkTableName("public.topic"))
                 .isEqualTo(new PostgresTablePath("public", "topic"));
     }
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
index 55dda31..e3aaefc 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
@@ -25,13 +25,14 @@ import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.flink.table.factories.FactoryUtil;
 
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.PostgreSQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
 import org.testcontainers.utility.DockerImageName;
 
 import java.sql.SQLException;
@@ -41,7 +42,8 @@ import java.util.Map;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link JdbcCatalogFactory}. */
-public class JdbcCatalogFactoryTest {
+@Testcontainers
+class JdbcCatalogFactoryTest {
 
     public static final Logger LOG = LoggerFactory.getLogger(JdbcCatalogFactoryTest.class);
 
@@ -55,15 +57,15 @@ public class JdbcCatalogFactoryTest {
     protected static final DockerImageName POSTGRES_IMAGE =
             DockerImageName.parse(DockerImageVersions.POSTGRES);
 
-    @ClassRule
-    public static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
+    @Container
+    static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
             new PostgreSQLContainer<>(POSTGRES_IMAGE)
                     .withUsername(TEST_USERNAME)
                     .withPassword(TEST_PWD)
                     .withLogConsumer(new Slf4jLogConsumer(LOG));
 
-    @BeforeClass
-    public static void setup() throws SQLException {
+    @BeforeAll
+    static void setup() throws SQLException {
         // jdbc:postgresql://localhost:50807/postgres?user=postgres
         String jdbcUrl = POSTGRES_CONTAINER.getJdbcUrl();
         // jdbc:postgresql://localhost:50807/
@@ -80,7 +82,7 @@ public class JdbcCatalogFactoryTest {
     }
 
     @Test
-    public void test() {
+    void test() {
         final Map<String, String> options = new HashMap<>();
         options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER);
         options.put(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java
index 670b39c..9f9fee1 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.sql.ResultSet;
@@ -32,10 +32,10 @@ import java.time.LocalDateTime;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link AbstractJdbcRowConverter}. */
-public class AbstractJdbcRowConverterTest {
+class AbstractJdbcRowConverterTest {
 
     @Test
-    public void testExternalLocalDateTimeToTimestamp() throws Exception {
+    void testExternalLocalDateTimeToTimestamp() throws Exception {
         RowType rowType = RowType.of(new IntType(), new TimestampType(3));
         JdbcRowConverter rowConverter =
                 new AbstractJdbcRowConverter(rowType) {
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java
index 006cc76..614020f 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java
@@ -23,7 +23,7 @@ import org.junit.jupiter.api.Test;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link MySqlDialect}. */
-public class MySqlDialectTest {
+class MySqlDialectTest {
 
     @Test
     void testAppendDefaultUrlProperties() {
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java
index b4827b6..79dee14 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
 import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
 import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.List;
@@ -33,7 +33,7 @@ import static java.util.Collections.singletonList;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link OraclePreparedStatementTest}. */
-public class OraclePreparedStatementTest {
+class OraclePreparedStatementTest {
 
     private final JdbcDialect dialect =
             JdbcDialectLoader.load(
@@ -44,7 +44,7 @@ public class OraclePreparedStatementTest {
     private final String tableName = "tbl";
 
     @Test
-    public void testInsertStatement() {
+    void testInsertStatement() {
         String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames);
         assertThat(insertStmt)
                 .isEqualTo(
@@ -64,7 +64,7 @@ public class OraclePreparedStatementTest {
     }
 
     @Test
-    public void testDeleteStatement() {
+    void testDeleteStatement() {
         String deleteStmt = dialect.getDeleteStatement(tableName, keyFields);
         assertThat(deleteStmt)
                 .isEqualTo("DELETE FROM tbl WHERE id = :id AND __field_3__ = :__field_3__");
@@ -75,7 +75,7 @@ public class OraclePreparedStatementTest {
     }
 
     @Test
-    public void testRowExistsStatement() {
+    void testRowExistsStatement() {
         String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields);
         assertThat(rowExistStmt)
                 .isEqualTo("SELECT 1 FROM tbl WHERE id = :id AND __field_3__ = :__field_3__");
@@ -86,7 +86,7 @@ public class OraclePreparedStatementTest {
     }
 
     @Test
-    public void testUpdateStatement() {
+    void testUpdateStatement() {
         String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, keyFields);
         assertThat(updateStmt)
                 .isEqualTo(
@@ -107,7 +107,7 @@ public class OraclePreparedStatementTest {
     }
 
     @Test
-    public void testUpsertStatement() {
+    void testUpsertStatement() {
         String upsertStmt = dialect.getUpsertStatement(tableName, fieldNames, keyFields).get();
         assertThat(upsertStmt)
                 .isEqualTo(
@@ -135,7 +135,7 @@ public class OraclePreparedStatementTest {
     }
 
     @Test
-    public void testSelectStatement() {
+    void testSelectStatement() {
         String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields);
         assertThat(selectStmt)
                 .isEqualTo(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSinkITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSinkITCase.java
index be8a6f8..8c9da92 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSinkITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSinkITCase.java
@@ -45,9 +45,9 @@ import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
 import java.sql.Connection;
@@ -66,7 +66,7 @@ import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 
 /** The Table Sink ITCase for {@link OracleDialect}. */
-public class OracleTableSinkITCase extends AbstractTestBase {
+class OracleTableSinkITCase extends AbstractTestBase {
 
     private static final OracleContainer container = new OracleContainer();
     private static String containerUrl;
@@ -78,8 +78,8 @@ public class OracleTableSinkITCase extends AbstractTestBase {
     public static final String OUTPUT_TABLE5 = "checkpointTable";
     public static final String USER_TABLE = "USER_TABLE";
 
-    @BeforeClass
-    public static void beforeAll() throws ClassNotFoundException, SQLException {
+    @BeforeAll
+    static void beforeAll() throws ClassNotFoundException, SQLException {
         container.start();
         containerUrl = container.getJdbcUrl();
         Class.forName(container.getDriverClassName());
@@ -128,8 +128,8 @@ public class OracleTableSinkITCase extends AbstractTestBase {
         }
     }
 
-    @AfterClass
-    public static void afterAll() throws Exception {
+    @AfterAll
+    static void afterAll() throws Exception {
         TestValuesTableFactory.clearAllData();
         Class.forName(container.getDriverClassName());
         try (Connection conn = DriverManager.getConnection(containerUrl);
@@ -181,7 +181,7 @@ public class OracleTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testReal() throws Exception {
+    void testReal() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.getConfig().enableObjectReuse();
         StreamTableEnvironment tEnv =
@@ -205,7 +205,7 @@ public class OracleTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testUpsert() throws Exception {
+    void testUpsert() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.getConfig().enableObjectReuse();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -271,7 +271,7 @@ public class OracleTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testAppend() throws Exception {
+    void testAppend() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.getConfig().enableObjectReuse();
         env.getConfig().setParallelism(1);
@@ -312,7 +312,7 @@ public class OracleTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testBatchSink() throws Exception {
+    void testBatchSink() throws Exception {
         TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
 
         tEnv.executeSql(
@@ -355,7 +355,7 @@ public class OracleTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testReadingFromChangelogSource() throws Exception {
+    void testReadingFromChangelogSource() throws Exception {
         TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
         String dataId = TestValuesTableFactory.registerData(TestData.userChangelog());
         tEnv.executeSql(
@@ -420,7 +420,7 @@ public class OracleTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testFlushBufferWhenCheckpoint() throws Exception {
+    void testFlushBufferWhenCheckpoint() throws Exception {
         Map<String, String> options = new HashMap<>();
         options.put("connector", "jdbc");
         options.put("url", containerUrl);
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSourceITCase.java
index 4c0a15d..eb340c0 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSourceITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSourceITCase.java
@@ -25,10 +25,10 @@ import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -44,7 +44,7 @@ import java.util.stream.Stream;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** The Table Source ITCase for {@link OracleDialect}. */
-public class OracleTableSourceITCase extends AbstractTestBase {
+class OracleTableSourceITCase extends AbstractTestBase {
 
     private static final OracleContainer container = new OracleContainer();
     private static String containerUrl;
@@ -53,8 +53,8 @@ public class OracleTableSourceITCase extends AbstractTestBase {
     private static StreamExecutionEnvironment env;
     private static TableEnvironment tEnv;
 
-    @BeforeClass
-    public static void beforeAll() throws ClassNotFoundException, SQLException {
+    @BeforeAll
+    static void beforeAll() throws ClassNotFoundException, SQLException {
         container.start();
         containerUrl = container.getJdbcUrl();
         Class.forName(container.getDriverClassName());
@@ -96,8 +96,8 @@ public class OracleTableSourceITCase extends AbstractTestBase {
         }
     }
 
-    @AfterClass
-    public static void afterAll() throws Exception {
+    @AfterAll
+    static void afterAll() throws Exception {
         Class.forName(container.getDriverClassName());
         try (Connection conn = DriverManager.getConnection(containerUrl);
                 Statement statement = conn.createStatement()) {
@@ -106,14 +106,14 @@ public class OracleTableSourceITCase extends AbstractTestBase {
         container.stop();
     }
 
-    @Before
-    public void before() throws Exception {
+    @BeforeEach
+    void before() throws Exception {
         env = StreamExecutionEnvironment.getExecutionEnvironment();
         tEnv = StreamTableEnvironment.create(env);
     }
 
     @Test
-    public void testJdbcSource() throws Exception {
+    void testJdbcSource() throws Exception {
         tEnv.executeSql(
                 "CREATE TABLE "
                         + INPUT_TABLE
@@ -158,7 +158,7 @@ public class OracleTableSourceITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testProject() throws Exception {
+    void testProject() throws Exception {
         tEnv.executeSql(
                 "CREATE TABLE "
                         + INPUT_TABLE
@@ -201,7 +201,7 @@ public class OracleTableSourceITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testLimit() throws Exception {
+    void testLimit() throws Exception {
         tEnv.executeSql(
                 "CREATE TABLE "
                         + INPUT_TABLE
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java
index 0227a58..b40466b 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java
@@ -45,10 +45,12 @@ import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.MSSQLServerContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
 
 import java.math.BigDecimal;
 import java.sql.Connection;
@@ -67,11 +69,14 @@ import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 
 /** The Table Sink ITCase for {@link SqlServerDialect}. */
-public class SqlServerTableSinkITCase extends AbstractTestBase {
+@Testcontainers
+class SqlServerTableSinkITCase extends AbstractTestBase {
 
-    private static final MSSQLServerContainer container =
+    @Container
+    private static final MSSQLServerContainer<?> container =
             new MSSQLServerContainer("mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04")
                     .acceptLicense();
+
     private static String containerUrl;
 
     public static final String OUTPUT_TABLE1 = "dynamicSinkForUpsert";
@@ -81,9 +86,8 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
     public static final String OUTPUT_TABLE5 = "checkpointTable";
     public static final String USER_TABLE = "USER_TABLE";
 
-    @BeforeClass
-    public static void beforeAll() throws ClassNotFoundException, SQLException {
-        container.start();
+    @BeforeAll
+    static void beforeAll() throws ClassNotFoundException, SQLException {
         containerUrl =
                 String.format(
                         "%s;username=%s;password=%s",
@@ -136,8 +140,8 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
         }
     }
 
-    @AfterClass
-    public static void afterAll() throws Exception {
+    @AfterAll
+    static void afterAll() throws Exception {
         TestValuesTableFactory.clearAllData();
         Class.forName(container.getDriverClassName());
         try (Connection conn =
@@ -191,7 +195,7 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testReal() throws Exception {
+    void testReal() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.getConfig().enableObjectReuse();
         StreamTableEnvironment tEnv =
@@ -221,7 +225,7 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testUpsert() throws Exception {
+    void testUpsert() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.getConfig().enableObjectReuse();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -293,7 +297,7 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testAppend() throws Exception {
+    void testAppend() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.getConfig().enableObjectReuse();
         env.getConfig().setParallelism(1);
@@ -340,7 +344,7 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testBatchSink() throws Exception {
+    void testBatchSink() throws Exception {
         TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
 
         tEnv.executeSql(
@@ -389,7 +393,7 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testReadingFromChangelogSource() throws Exception {
+    void testReadingFromChangelogSource() throws Exception {
         TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
         String dataId = TestValuesTableFactory.registerData(TestData.userChangelog());
         tEnv.executeSql(
@@ -460,7 +464,7 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testFlushBufferWhenCheckpoint() throws Exception {
+    void testFlushBufferWhenCheckpoint() throws Exception {
         Map<String, String> options = new HashMap<>();
         options.put("connector", "jdbc");
         options.put("url", containerUrl);
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSourceITCase.java
index 2c9c73b..9abfe54 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSourceITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSourceITCase.java
@@ -25,10 +25,10 @@ import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.MSSQLServerContainer;
 
 import java.sql.Connection;
@@ -43,7 +43,7 @@ import java.util.stream.Stream;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** The Table Source ITCase for {@link SqlServerDialect}. */
-public class SqlServerTableSourceITCase extends AbstractTestBase {
+class SqlServerTableSourceITCase extends AbstractTestBase {
 
     private static final MSSQLServerContainer container =
             new MSSQLServerContainer("mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04")
@@ -54,8 +54,8 @@ public class SqlServerTableSourceITCase extends AbstractTestBase {
     private static StreamExecutionEnvironment env;
     private static TableEnvironment tEnv;
 
-    @BeforeClass
-    public static void beforeAll() throws ClassNotFoundException, SQLException {
+    @BeforeAll
+    static void beforeAll() throws ClassNotFoundException, SQLException {
         container.start();
         containerUrl = container.getJdbcUrl();
         Class.forName(container.getDriverClassName());
@@ -106,8 +106,8 @@ public class SqlServerTableSourceITCase extends AbstractTestBase {
         }
     }
 
-    @AfterClass
-    public static void afterAll() throws Exception {
+    @AfterAll
+    static void afterAll() throws Exception {
         Class.forName(container.getDriverClassName());
         try (Connection conn =
                         DriverManager.getConnection(
@@ -118,14 +118,14 @@ public class SqlServerTableSourceITCase extends AbstractTestBase {
         container.stop();
     }
 
-    @Before
-    public void before() throws Exception {
+    @BeforeEach
+    void before() throws Exception {
         env = StreamExecutionEnvironment.getExecutionEnvironment();
         tEnv = StreamTableEnvironment.create(env);
     }
 
     @Test
-    public void testJdbcSource() throws Exception {
+    void testJdbcSource() throws Exception {
         createFlinkTable();
         Iterator<Row> collected = tEnv.executeSql("SELECT * FROM " + INPUT_TABLE).collect();
         List<String> result =
@@ -149,7 +149,7 @@ public class SqlServerTableSourceITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testProject() throws Exception {
+    void testProject() throws Exception {
         createFlinkTable();
         Iterator<Row> collected =
                 tEnv.executeSql("SELECT id,datetime_col,decimal_col FROM " + INPUT_TABLE).collect();
@@ -168,7 +168,7 @@ public class SqlServerTableSourceITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testFilter() throws Exception {
+    void testFilter() throws Exception {
         createFlinkTable();
         Iterator<Row> collected =
                 tEnv.executeSql(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
index b1904f3..e92a9ff 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
@@ -34,8 +34,8 @@ import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
 import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
 import org.apache.flink.types.Row;
 
-import org.junit.After;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.sql.Connection;
@@ -60,20 +60,20 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.doReturn;
 
 /** Tests using both {@link JdbcInputFormat} and {@link JdbcOutputFormat}. */
-public class JdbcFullTest extends JdbcDataTestBase {
+class JdbcFullTest extends JdbcDataTestBase {
 
     @Test
-    public void testWithoutParallelism() throws Exception {
+    void testWithoutParallelism() throws Exception {
         runTest(false);
     }
 
     @Test
-    public void testWithParallelism() throws Exception {
+    void testWithParallelism() throws Exception {
         runTest(true);
     }
 
     @Test
-    public void testEnrichedClassCastException() {
+    void testEnrichedClassCastException() {
         String expectedMsg = "field index: 3, field value: 11.11.";
         try {
             JdbcOutputFormat jdbcOutputFormat =
@@ -173,8 +173,8 @@ public class JdbcFullTest extends JdbcDataTestBase {
         }
     }
 
-    @After
-    public void clearOutputTable() throws Exception {
+    @AfterEach
+    void clearOutputTable() throws Exception {
         Class.forName(getDbMetadata().getDriverClass());
         try (Connection conn = DriverManager.getConnection(getDbMetadata().getUrl());
                 Statement stat = conn.createStatement()) {
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
index 9e684e3..938dcd1 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
@@ -29,9 +29,9 @@ import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
 import org.apache.flink.types.Row;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.sql.Connection;
@@ -57,14 +57,14 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase {
     private String[] fieldNames;
     private String[] keyFields;
 
-    @Before
-    public void setup() {
+    @BeforeEach
+    void setup() {
         fieldNames = new String[] {"id", "title", "author", "price", "qty"};
         keyFields = new String[] {"id"};
     }
 
     @Test
-    public void testUpsertFormatCloseBeforeOpen() throws Exception {
+    void testUpsertFormatCloseBeforeOpen() throws Exception {
         JdbcConnectorOptions options =
                 JdbcConnectorOptions.builder()
                         .setDBUrl(getDbMetadata().getUrl())
@@ -91,7 +91,7 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase {
      * JdbcOutputFormat#attemptFlush()} fails.
      */
     @Test
-    public void testDeleteExecutorUpdatedOnReconnect() throws Exception {
+    void testDeleteExecutorUpdatedOnReconnect() throws Exception {
         // first fail flush from the main executor
         boolean[] exceptionThrown = {false};
         // then record whether the delete executor was updated
@@ -171,7 +171,7 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testJdbcOutputFormat() throws Exception {
+    void testJdbcOutputFormat() throws Exception {
         JdbcConnectorOptions options =
                 JdbcConnectorOptions.builder()
                         .setDBUrl(getDbMetadata().getUrl())
@@ -246,8 +246,8 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase {
         }
     }
 
-    @After
-    public void clearOutputTable() throws Exception {
+    @AfterEach
+    void clearOutputTable() throws Exception {
         if (format != null) {
             format.close();
         }
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase.java
index ffd2e5e..5fb3676 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase.java
@@ -22,11 +22,13 @@ import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.fakedb.FakeDBUtils;
 import org.apache.flink.core.testutils.CheckedThread;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.lang.reflect.Method;
 import java.sql.Connection;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -35,7 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat;
  * This test deals with sql driver class loading issues; run as an ITCase so it won't be interfered
  * with by other tests.
  */
-public class SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase {
+class SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase {
     private static boolean isClassLoaded(ClassLoader classLoader, String className)
             throws Exception {
         do {
@@ -50,8 +52,9 @@ public class SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase {
         return false;
     }
 
-    @Test(timeout = 5000)
-    public void testDriverClassConcurrentLoading() throws Exception {
+    @Test
+    @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
+    void testDriverClassConcurrentLoading() throws Exception {
         ClassLoader classLoader = getClass().getClassLoader();
 
         assertThat(isClassLoaded(classLoader, FakeDBUtils.DRIVER1_CLASS_NAME)).isFalse();
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderTest.java
index 1e19299..cf99e72 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.connector.jdbc.fakedb.driver.FakeConnection1;
 import org.apache.flink.connector.jdbc.fakedb.driver.FakeConnection2;
 import org.apache.flink.connector.jdbc.fakedb.driver.FakeConnection3;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.sql.Connection;
 import java.sql.Driver;
@@ -38,7 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link SimpleJdbcConnectionProvider}. */
-public class SimpleJdbcConnectionProviderTest {
+class SimpleJdbcConnectionProviderTest {
 
     private static JdbcConnectionProvider newFakeConnectionProviderWithDriverName(
             String driverName) {
@@ -59,7 +59,7 @@ public class SimpleJdbcConnectionProviderTest {
     }
 
     @Test
-    public void testEstablishConnection() throws Exception {
+    void testEstablishConnection() throws Exception {
         JdbcConnectionProvider provider = newFakeConnectionProvider();
         assertThat(provider.getConnection()).isNull();
         assertThat(provider.isConnectionValid()).isFalse();
@@ -75,7 +75,7 @@ public class SimpleJdbcConnectionProviderTest {
     }
 
     @Test
-    public void testEstablishConnectionWithoutDriverName() throws Exception {
+    void testEstablishConnectionWithoutDriverName() throws Exception {
         JdbcConnectionProvider provider = newProvider(FakeDBUtils.TEST_DB_URL, null);
         assertThat(provider.getConnection()).isNull();
         assertThat(provider.isConnectionValid()).isFalse();
@@ -93,7 +93,7 @@ public class SimpleJdbcConnectionProviderTest {
     }
 
     @Test
-    public void testEstablishDriverConnection() throws Exception {
+    void testEstablishDriverConnection() throws Exception {
         JdbcConnectionProvider provider1 =
                 newFakeConnectionProviderWithDriverName(FakeDBUtils.DRIVER1_CLASS_NAME);
         Connection connection1 = provider1.getOrEstablishConnection();
@@ -106,7 +106,7 @@ public class SimpleJdbcConnectionProviderTest {
     }
 
     @Test
-    public void testEstablishUnregisteredDriverConnection() throws Exception {
+    void testEstablishUnregisteredDriverConnection() throws Exception {
         String unregisteredDriverName = FakeDBUtils.DRIVER3_CLASS_NAME;
         Set<String> registeredDriverNames =
                 Collections.list(DriverManager.getDrivers()).stream()
@@ -122,7 +122,7 @@ public class SimpleJdbcConnectionProviderTest {
     }
 
     @Test
-    public void testInvalidDriverUrl() {
+    void testInvalidDriverUrl() {
         JdbcConnectionProvider provider =
                 newProvider(FakeDBUtils.TEST_DB_INVALID_URL, FakeDBUtils.DRIVER1_CLASS_NAME);
 
@@ -133,7 +133,7 @@ public class SimpleJdbcConnectionProviderTest {
     }
 
     @Test
-    public void testCloseNullConnection() throws Exception {
+    void testCloseNullConnection() throws Exception {
         JdbcConnectionProvider provider = newFakeConnectionProvider();
         provider.closeConnection();
         assertThat(provider.getConnection()).isNull();
@@ -141,7 +141,7 @@ public class SimpleJdbcConnectionProviderTest {
     }
 
     @Test
-    public void testCloseConnection() throws Exception {
+    void testCloseConnection() throws Exception {
         JdbcConnectionProvider provider = newFakeConnectionProvider();
 
         Connection connection1 = provider.getOrEstablishConnection();
@@ -160,7 +160,7 @@ public class SimpleJdbcConnectionProviderTest {
     }
 
     @Test
-    public void testReestablishCachedConnection() throws Exception {
+    void testReestablishCachedConnection() throws Exception {
         JdbcConnectionProvider provider = newFakeConnectionProvider();
 
         Connection connection1 = provider.reestablishConnection();
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java
index cb1ebf3..25e350e 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java
@@ -18,17 +18,17 @@
 
 package org.apache.flink.connector.jdbc.split;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.Serializable;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link JdbcNumericBetweenParametersProvider}. */
-public class NumericBetweenParametersProviderTest {
+class NumericBetweenParametersProviderTest {
 
     @Test
-    public void testBatchSizeDivisible() {
+    void testBatchSizeDivisible() {
         JdbcNumericBetweenParametersProvider provider =
                 new JdbcNumericBetweenParametersProvider(-5, 9).ofBatchSize(3);
         Serializable[][] actual = provider.getParameterValues();
@@ -44,7 +44,7 @@ public class NumericBetweenParametersProviderTest {
     }
 
     @Test
-    public void testBatchSizeNotDivisible() {
+    void testBatchSizeNotDivisible() {
         JdbcNumericBetweenParametersProvider provider =
                 new JdbcNumericBetweenParametersProvider(-5, 11).ofBatchSize(4);
         Serializable[][] actual = provider.getParameterValues();
@@ -60,7 +60,7 @@ public class NumericBetweenParametersProviderTest {
     }
 
     @Test
-    public void testBatchSizeTooLarge() {
+    void testBatchSizeTooLarge() {
         JdbcNumericBetweenParametersProvider provider =
                 new JdbcNumericBetweenParametersProvider(0, 2).ofBatchSize(5);
         Serializable[][] actual = provider.getParameterValues();
@@ -70,7 +70,7 @@ public class NumericBetweenParametersProviderTest {
     }
 
     @Test
-    public void testBatchNumDivisible() {
+    void testBatchNumDivisible() {
         JdbcNumericBetweenParametersProvider provider =
                 new JdbcNumericBetweenParametersProvider(-5, 9).ofBatchNum(5);
         Serializable[][] actual = provider.getParameterValues();
@@ -86,7 +86,7 @@ public class NumericBetweenParametersProviderTest {
     }
 
     @Test
-    public void testBatchNumNotDivisible() {
+    void testBatchNumNotDivisible() {
         JdbcNumericBetweenParametersProvider provider =
                 new JdbcNumericBetweenParametersProvider(-5, 11).ofBatchNum(5);
         Serializable[][] actual = provider.getParameterValues();
@@ -102,7 +102,7 @@ public class NumericBetweenParametersProviderTest {
     }
 
     @Test
-    public void testBatchNumTooLarge() {
+    void testBatchNumTooLarge() {
         JdbcNumericBetweenParametersProvider provider =
                 new JdbcNumericBetweenParametersProvider(0, 2).ofBatchNum(5);
         Serializable[][] actual = provider.getParameterValues();
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
index 013c711..b1d5684 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.connector.jdbc.statement;
 import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
 import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.List;
@@ -32,7 +32,7 @@ import static java.util.Collections.singletonList;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link FieldNamedPreparedStatementImpl}. */
-public class FieldNamedPreparedStatementImplTest {
+class FieldNamedPreparedStatementImplTest {
 
     private final JdbcDialect dialect =
             JdbcDialectLoader.load("jdbc:mysql://localhost:3306/test", getClass().getClassLoader());
@@ -42,7 +42,7 @@ public class FieldNamedPreparedStatementImplTest {
     private final String tableName = "tbl";
 
     @Test
-    public void testInsertStatement() {
+    void testInsertStatement() {
         String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames);
         assertThat(insertStmt)
                 .isEqualTo(
@@ -62,7 +62,7 @@ public class FieldNamedPreparedStatementImplTest {
     }
 
     @Test
-    public void testDeleteStatement() {
+    void testDeleteStatement() {
         String deleteStmt = dialect.getDeleteStatement(tableName, keyFields);
         assertThat(deleteStmt)
                 .isEqualTo("DELETE FROM `tbl` WHERE `id` = :id AND `__field_3__` = :__field_3__");
@@ -73,7 +73,7 @@ public class FieldNamedPreparedStatementImplTest {
     }
 
     @Test
-    public void testRowExistsStatement() {
+    void testRowExistsStatement() {
         String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields);
         assertThat(rowExistStmt)
                 .isEqualTo("SELECT 1 FROM `tbl` WHERE `id` = :id AND `__field_3__` = :__field_3__");
@@ -84,7 +84,7 @@ public class FieldNamedPreparedStatementImplTest {
     }
 
     @Test
-    public void testUpdateStatement() {
+    void testUpdateStatement() {
         String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, keyFields);
         assertThat(updateStmt)
                 .isEqualTo(
@@ -105,7 +105,7 @@ public class FieldNamedPreparedStatementImplTest {
     }
 
     @Test
-    public void testUpsertStatement() {
+    void testUpsertStatement() {
         String upsertStmt = dialect.getUpsertStatement(tableName, fieldNames, keyFields).get();
         assertThat(upsertStmt)
                 .isEqualTo(
@@ -130,7 +130,7 @@ public class FieldNamedPreparedStatementImplTest {
     }
 
     @Test
-    public void testSelectStatement() {
+    void testSelectStatement() {
         String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields);
         assertThat(selectStmt)
                 .isEqualTo(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
index 66b0cc4..34ad9e1 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
@@ -27,9 +27,9 @@ import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
 import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
@@ -42,50 +42,57 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.OUTPUT_TABLE;
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.doReturn;
 
 /** Test for the Append only mode. */
-public class JdbcAppendOnlyWriterTest extends JdbcTestBase {
+class JdbcAppendOnlyWriterTest extends JdbcTestBase {
 
     private JdbcOutputFormat format;
     private String[] fieldNames;
 
-    @Before
-    public void setup() {
+    @BeforeEach
+    void setup() {
         fieldNames = new String[] {"id", "title", "author", "price", "qty"};
     }
 
-    @Test(expected = IOException.class)
-    public void testMaxRetry() throws Exception {
-        format =
-                JdbcOutputFormat.builder()
-                        .setOptions(
-                                JdbcConnectorOptions.builder()
-                                        .setDBUrl(getDbMetadata().getUrl())
-                                        .setDialect(
-                                                JdbcDialectLoader.load(
-                                                        getDbMetadata().getUrl(),
-                                                        getClass().getClassLoader()))
-                                        .setTableName(OUTPUT_TABLE)
-                                        .build())
-                        .setFieldNames(fieldNames)
-                        .setKeyFields(null)
-                        .build();
-        RuntimeContext context = Mockito.mock(RuntimeContext.class);
-        ExecutionConfig config = Mockito.mock(ExecutionConfig.class);
-        doReturn(config).when(context).getExecutionConfig();
-        doReturn(true).when(config).isObjectReuseEnabled();
-        format.setRuntimeContext(context);
-        format.open(0, 1);
+    @Test
+    void testMaxRetry() throws Exception {
+        assertThatThrownBy(
+                        () -> {
+                            format =
+                                    JdbcOutputFormat.builder()
+                                            .setOptions(
+                                                    JdbcConnectorOptions.builder()
+                                                            .setDBUrl(getDbMetadata().getUrl())
+                                                            .setDialect(
+                                                                    JdbcDialectLoader.load(
+                                                                            getDbMetadata()
+                                                                                    .getUrl(),
+                                                                            getClass()
+                                                                                    .getClassLoader()))
+                                                            .setTableName(OUTPUT_TABLE)
+                                                            .build())
+                                            .setFieldNames(fieldNames)
+                                            .setKeyFields(null)
+                                            .build();
+                            RuntimeContext context = Mockito.mock(RuntimeContext.class);
+                            ExecutionConfig config = Mockito.mock(ExecutionConfig.class);
+                            doReturn(config).when(context).getExecutionConfig();
+                            doReturn(true).when(config).isObjectReuseEnabled();
+                            format.setRuntimeContext(context);
+                            format.open(0, 1);
 
-        // alter table schema to trigger retry logic after failure.
-        alterTable();
-        for (TestEntry entry : TEST_DATA) {
-            format.writeRecord(Tuple2.of(true, toRow(entry)));
-        }
+                            // alter table schema to trigger retry logic after failure.
+                            alterTable();
+                            for (TestEntry entry : TEST_DATA) {
+                                format.writeRecord(Tuple2.of(true, toRow(entry)));
+                            }
 
-        // after retry default times, throws a BatchUpdateException.
-        format.flush();
+                            // after retry default times, throws a BatchUpdateException.
+                            format.flush();
+                        })
+                .isInstanceOf(IOException.class);
     }
 
     private void alterTable() throws Exception {
@@ -96,8 +103,8 @@ public class JdbcAppendOnlyWriterTest extends JdbcTestBase {
         }
     }
 
-    @After
-    public void clear() throws Exception {
+    @AfterEach
+    void clear() throws Exception {
         if (format != null) {
             try {
                 format.close();
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java
index 13617ef..44d4d65 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.lookup.LookupOptions;
 import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.Arrays;
@@ -49,7 +49,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
  * Test for {@link JdbcDynamicTableSource} and {@link JdbcDynamicTableSink} created by {@link
  * JdbcDynamicTableFactory}.
  */
-public class JdbcDynamicTableFactoryTest {
+class JdbcDynamicTableFactoryTest {
 
     private static final ResolvedSchema SCHEMA =
             new ResolvedSchema(
@@ -63,7 +63,7 @@ public class JdbcDynamicTableFactoryTest {
                     UniqueConstraint.primaryKey("name", Arrays.asList("bbb", "aaa")));
 
     @Test
-    public void testJdbcCommonProperties() {
+    void testJdbcCommonProperties() {
         Map<String, String> properties = getAllOptions();
         properties.put("driver", "org.apache.derby.jdbc.EmbeddedDriver");
         properties.put("username", "user");
@@ -113,7 +113,7 @@ public class JdbcDynamicTableFactoryTest {
     }
 
     @Test
-    public void testJdbcReadProperties() {
+    void testJdbcReadProperties() {
         Map<String, String> properties = getAllOptions();
         properties.put("scan.partition.column", "aaa");
         properties.put("scan.partition.lower-bound", "-10");
@@ -150,7 +150,7 @@ public class JdbcDynamicTableFactoryTest {
     }
 
     @Test
-    public void testJdbcLookupProperties() {
+    void testJdbcLookupProperties() {
         Map<String, String> properties = getAllOptions();
         properties.put("lookup.cache", "PARTIAL");
         properties.put("lookup.partial-cache.expire-after-write", "10s");
@@ -178,7 +178,7 @@ public class JdbcDynamicTableFactoryTest {
     }
 
     @Test
-    public void testJdbcLookupPropertiesWithLegacyOptions() {
+    void testJdbcLookupPropertiesWithLegacyOptions() {
         Map<String, String> properties = getAllOptions();
         properties.put("lookup.cache.max-rows", "1000");
         properties.put("lookup.cache.ttl", "10s");
@@ -206,7 +206,7 @@ public class JdbcDynamicTableFactoryTest {
     }
 
     @Test
-    public void testJdbcSinkProperties() {
+    void testJdbcSinkProperties() {
         Map<String, String> properties = getAllOptions();
         properties.put("sink.buffer-flush.max-rows", "1000");
         properties.put("sink.buffer-flush.interval", "2min");
@@ -241,7 +241,7 @@ public class JdbcDynamicTableFactoryTest {
     }
 
     @Test
-    public void testJDBCSinkWithParallelism() {
+    void testJDBCSinkWithParallelism() {
         Map<String, String> properties = getAllOptions();
         properties.put("sink.parallelism", "2");
 
@@ -275,7 +275,7 @@ public class JdbcDynamicTableFactoryTest {
     }
 
     @Test
-    public void testJdbcValidation() {
+    void testJdbcValidation() {
         // only password, no username
         Map<String, String> properties = getAllOptions();
         properties.put("password", "pass");
@@ -362,7 +362,7 @@ public class JdbcDynamicTableFactoryTest {
     }
 
     @Test
-    public void testJdbcLookupPropertiesWithExcludeEmptyResult() {
+    void testJdbcLookupPropertiesWithExcludeEmptyResult() {
         Map<String, String> properties = getAllOptions();
         properties.put("lookup.cache.max-rows", "1000");
         properties.put("lookup.cache.ttl", "10s");
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
index 037cda3..82a188d 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
@@ -46,9 +46,9 @@ import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
 import java.sql.Connection;
@@ -68,7 +68,7 @@ import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 
 /** The ITCase for {@link JdbcDynamicTableSink}. */
-public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
+class JdbcDynamicTableSinkITCase extends AbstractTestBase {
 
     public static final String DB_URL = "jdbc:derby:memory:upsert";
     public static final String OUTPUT_TABLE1 = "dynamicSinkForUpsert";
@@ -78,8 +78,8 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
     public static final String OUTPUT_TABLE5 = "checkpointTable";
     public static final String USER_TABLE = "USER_TABLE";
 
-    @BeforeClass
-    public static void beforeAll() throws ClassNotFoundException, SQLException {
+    @BeforeAll
+    static void beforeAll() throws ClassNotFoundException, SQLException {
         System.setProperty(
                 "derby.stream.error.field", JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL");
 
@@ -129,8 +129,8 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
         }
     }
 
-    @AfterClass
-    public static void afterAll() throws Exception {
+    @AfterAll
+    static void afterAll() throws Exception {
         TestValuesTableFactory.clearAllData();
         Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
         try (Connection conn = DriverManager.getConnection(DB_URL);
@@ -181,7 +181,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testReal() throws Exception {
+    void testReal() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.getConfig().enableObjectReuse();
         StreamTableEnvironment tEnv =
@@ -205,7 +205,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testUpsert() throws Exception {
+    void testUpsert() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.getConfig().enableObjectReuse();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -271,7 +271,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testAppend() throws Exception {
+    void testAppend() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.getConfig().enableObjectReuse();
         env.getConfig().setParallelism(1);
@@ -312,7 +312,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testBatchSink() throws Exception {
+    void testBatchSink() throws Exception {
         TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
 
         tEnv.executeSql(
@@ -355,7 +355,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testReadingFromChangelogSource() throws Exception {
+    void testReadingFromChangelogSource() throws Exception {
         TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
         String dataId = TestValuesTableFactory.registerData(TestData.userChangelog());
         tEnv.executeSql(
@@ -420,7 +420,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
     }
 
     @Test
-    public void testFlushBufferWhenCheckpoint() throws Exception {
+    void testFlushBufferWhenCheckpoint() throws Exception {
         Map<String, String> options = new HashMap<>();
         options.put("connector", "jdbc");
         options.put("url", DB_URL);
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
index d23c5d1..17e1be0 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
@@ -66,7 +66,7 @@ import java.util.stream.Stream;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** ITCase for {@link JdbcDynamicTableSource}. */
-public class JdbcDynamicTableSourceITCase {
+class JdbcDynamicTableSourceITCase {
 
     @RegisterExtension
     static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
index e7eb8a4..8bdfc67 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
@@ -41,9 +41,9 @@ import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.calcite.rex.RexBuilder;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
@@ -57,11 +57,10 @@ import java.util.List;
 import java.util.Optional;
 import java.util.TimeZone;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link JdbcFilterPushdownPreparedStatementVisitor}. */
-public class JdbcFilterPushdownPreparedStatementVisitorTest {
+class JdbcFilterPushdownPreparedStatementVisitorTest {
 
     private final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
     public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
@@ -71,8 +70,8 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
     public static StreamExecutionEnvironment env;
     public static TableEnvironment tEnv;
 
-    @Before
-    public void before() throws ClassNotFoundException, SQLException {
+    @BeforeEach
+    void before() throws ClassNotFoundException, SQLException {
         env = StreamExecutionEnvironment.getExecutionEnvironment();
         tEnv = StreamTableEnvironment.create(env);
 
@@ -122,8 +121,8 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
                         + ")");
     }
 
-    @After
-    public void clearOutputTable() throws Exception {
+    @AfterEach
+    void clearOutputTable() throws Exception {
         Class.forName(DRIVER_CLASS);
         try (Connection conn = DriverManager.getConnection(DB_URL);
                 Statement stat = conn.createStatement()) {
@@ -133,7 +132,7 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
     }
 
     @Test
-    public void testSimpleExpressionPrimitiveType() {
+    void testSimpleExpressionPrimitiveType() {
         ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
         Arrays.asList(
                         new Object[] {"id = 6", "id = ?", 6L},
@@ -156,7 +155,7 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
     }
 
     @Test
-    public void testComplexExpressionDatetime() {
+    void testComplexExpressionDatetime() {
         ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
         String andExpr = "id = 6 AND timestamp6_col = TIMESTAMP '2022-01-01 07:00:01.333'";
         Serializable[] expectedParams1 = {6L, Timestamp.valueOf("2022-01-01 07:00:01.333000")};
@@ -171,7 +170,7 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
     }
 
     @Test
-    public void testExpressionWithNull() {
+    void testExpressionWithNull() {
         ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
         String andExpr = "id = NULL AND real_col <= 0.6";
 
@@ -186,7 +185,7 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
     }
 
     @Test
-    public void testExpressionIsNull() {
+    void testExpressionIsNull() {
         ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
         String andExpr = "id IS NULL AND real_col <= 0.6";
 
@@ -201,7 +200,7 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
     }
 
     @Test
-    public void testComplexExpressionPrimitiveType() {
+    void testComplexExpressionPrimitiveType() {
         ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
         String andExpr = "id = NULL AND real_col <= 0.6";
         Serializable[] expectedParams1 = {null, new BigDecimal("0.6")};
@@ -220,15 +219,15 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
             String expectedOutputExpr,
             Serializable[] expectedParams) {
         List<ResolvedExpression> resolved = resolveSQLFilterToExpression(inputExpr, schema);
-        assertEquals(1, resolved.size());
+        assertThat(resolved.size()).isEqualTo(1);
         JdbcDialect dialect = new DerbyDialectFactory().create();
         JdbcFilterPushdownPreparedStatementVisitor visitor =
                 new JdbcFilterPushdownPreparedStatementVisitor(dialect::quoteIdentifier);
         ParameterizedPredicate pred = resolved.get(0).accept(visitor).get();
 
         // our visitor always wrap expression
-        assertEquals(expectedOutputExpr, pred.getPredicate());
-        assertArrayEquals(expectedParams, pred.getParameters());
+        assertThat(pred.getPredicate()).isEqualTo(expectedOutputExpr);
+        assertThat(pred.getParameters()).isEqualTo(expectedParams);
     }
 
     private void assertSimpleInputExprEqualsOutExpr(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
index 5937946..700e196 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
@@ -31,13 +31,13 @@ import java.sql.Statement;
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB;
 
 /** Base class for JDBC lookup test. */
-public class JdbcLookupTestBase {
+class JdbcLookupTestBase {
 
     public static final String DB_URL = "jdbc:derby:memory:lookup";
     public static final String LOOKUP_TABLE = "lookup_table";
 
     @BeforeEach
-    public void before() throws ClassNotFoundException, SQLException {
+    void before() throws ClassNotFoundException, SQLException {
         System.setProperty(
                 "derby.stream.error.field", JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL");
 
@@ -103,7 +103,7 @@ public class JdbcLookupTestBase {
     }
 
     @AfterEach
-    public void clearOutputTable() throws Exception {
+    void clearOutputTable() throws Exception {
         Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
         try (Connection conn = DriverManager.getConnection(DB_URL);
                 Statement stat = conn.createStatement()) {
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
index e870873..487d69c 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
@@ -30,8 +30,8 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.junit.After;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -56,7 +56,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test suite for {@link JdbcOutputFormatBuilder}. */
-public class JdbcOutputFormatTest extends JdbcDataTestBase {
+class JdbcOutputFormatTest extends JdbcDataTestBase {
 
     private static JdbcOutputFormat<RowData, ?, ?> outputFormat;
     private static String[] fieldNames = new String[] {"id", "title", "author", "price", "qty"};
@@ -76,8 +76,8 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
                     fieldNames);
     private static InternalTypeInfo<RowData> rowDataTypeInfo = InternalTypeInfo.of(rowType);
 
-    @After
-    public void tearDown() throws Exception {
+    @AfterEach
+    void tearDown() throws Exception {
         if (outputFormat != null) {
             outputFormat.close();
         }
@@ -85,7 +85,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testInvalidDriver() {
+    void testInvalidDriver() {
         String expectedMsg = "unable to open JDBC writer";
         assertThatThrownBy(
                         () -> {
@@ -117,7 +117,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testInvalidURL() {
+    void testInvalidURL() {
         assertThatThrownBy(
                         () -> {
                             JdbcConnectorOptions jdbcOptions =
@@ -147,7 +147,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testIncompatibleTypes() {
+    void testIncompatibleTypes() {
         assertThatThrownBy(
                         () -> {
                             JdbcConnectorOptions jdbcOptions =
@@ -186,7 +186,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testExceptionOnInvalidType() {
+    void testExceptionOnInvalidType() {
         assertThatThrownBy(
                         () -> {
                             JdbcConnectorOptions jdbcOptions =
@@ -226,7 +226,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testExceptionOnClose() {
+    void testExceptionOnClose() {
         String expectedMsg = "Writing records to JDBC failed.";
         assertThatThrownBy(
                         () -> {
@@ -275,7 +275,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testJdbcOutputFormat() throws IOException, SQLException {
+    void testJdbcOutputFormat() throws IOException, SQLException {
         JdbcConnectorOptions jdbcOptions =
                 JdbcConnectorOptions.builder()
                         .setDriverName(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -328,7 +328,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testFlush() throws SQLException, IOException {
+    void testFlush() throws SQLException, IOException {
         JdbcConnectorOptions jdbcOptions =
                 JdbcConnectorOptions.builder()
                         .setDriverName(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -398,7 +398,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testFlushWithBatchSizeEqualsZero() throws SQLException, IOException {
+    void testFlushWithBatchSizeEqualsZero() throws SQLException, IOException {
         JdbcConnectorOptions jdbcOptions =
                 JdbcConnectorOptions.builder()
                         .setDriverName(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -445,7 +445,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testInvalidConnectionInJdbcOutputFormat() throws IOException, SQLException {
+    void testInvalidConnectionInJdbcOutputFormat() throws IOException, SQLException {
         JdbcConnectorOptions jdbcOptions =
                 JdbcConnectorOptions.builder()
                         .setDriverName(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -506,8 +506,8 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
         }
     }
 
-    @After
-    public void clearOutputTable() throws Exception {
+    @AfterEach
+    void clearOutputTable() throws Exception {
         Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
         try (Connection conn = DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl());
                 Statement stat = conn.createStatement()) {
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
index 005f60c..1e9ce3a 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
@@ -34,10 +34,8 @@ import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -52,11 +50,10 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.SELECT_ALL_BOOKS_S
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.SELECT_EMPTY;
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test suite for {@link JdbcRowDataInputFormat}. */
-public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
-
-    @Rule public ExpectedException thrown = ExpectedException.none();
+class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
 
     private JdbcRowDataInputFormat inputFormat;
     private static String[] fieldNames = new String[] {"id", "title", "author", "price", "qty"};
@@ -81,8 +78,8 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
                             .toArray(LogicalType[]::new),
                     fieldNames);
 
-    @After
-    public void tearDown() throws IOException {
+    @AfterEach
+    void tearDown() throws IOException {
         if (inputFormat != null) {
             inputFormat.close();
             inputFormat.closeInputFormat();
@@ -91,91 +88,115 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testNoRowConverter() throws IOException {
-        thrown.expect(NullPointerException.class);
-        thrown.expectMessage("No row converter supplied");
-        inputFormat =
-                JdbcRowDataInputFormat.builder()
-                        .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
-                        .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
-                        .setQuery(SELECT_ALL_BOOKS)
-                        .build();
-        inputFormat.openInputFormat();
+    void testNoRowConverter() {
+        assertThatThrownBy(
+                        () -> {
+                            inputFormat =
+                                    JdbcRowDataInputFormat.builder()
+                                            .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+                                            .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+                                            .setQuery(SELECT_ALL_BOOKS)
+                                            .build();
+                            inputFormat.openInputFormat();
+                        })
+                .isInstanceOf(NullPointerException.class);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testInvalidDriver() throws IOException {
-        inputFormat =
-                JdbcRowDataInputFormat.builder()
-                        .setDrivername("org.apache.derby.jdbc.idontexist")
-                        .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
-                        .setQuery(SELECT_ALL_BOOKS)
-                        .setRowConverter(dialect.getRowConverter(rowType))
-                        .build();
-        inputFormat.openInputFormat();
+    @Test
+    void testInvalidDriver() {
+        assertThatThrownBy(
+                        () -> {
+                            inputFormat =
+                                    JdbcRowDataInputFormat.builder()
+                                            .setDrivername("org.apache.derby.jdbc.idontexist")
+                                            .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+                                            .setQuery(SELECT_ALL_BOOKS)
+                                            .setRowConverter(dialect.getRowConverter(rowType))
+                                            .build();
+                            inputFormat.openInputFormat();
+                        })
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testInvalidURL() throws IOException {
-        inputFormat =
-                JdbcRowDataInputFormat.builder()
-                        .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
-                        .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
-                        .setQuery(SELECT_ALL_BOOKS)
-                        .setRowConverter(dialect.getRowConverter(rowType))
-                        .build();
-        inputFormat.openInputFormat();
+    @Test
+    void testInvalidURL() {
+        assertThatThrownBy(
+                        () -> {
+                            inputFormat =
+                                    JdbcRowDataInputFormat.builder()
+                                            .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+                                            .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
+                                            .setQuery(SELECT_ALL_BOOKS)
+                                            .setRowConverter(dialect.getRowConverter(rowType))
+                                            .build();
+                            inputFormat.openInputFormat();
+                        })
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testInvalidQuery() throws IOException {
-        inputFormat =
-                JdbcRowDataInputFormat.builder()
-                        .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
-                        .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
-                        .setQuery("iamnotsql")
-                        .setRowConverter(dialect.getRowConverter(rowType))
-                        .build();
-        inputFormat.openInputFormat();
+    @Test
+    void testInvalidQuery() {
+        assertThatThrownBy(
+                        () -> {
+                            inputFormat =
+                                    JdbcRowDataInputFormat.builder()
+                                            .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+                                            .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+                                            .setQuery("iamnotsql")
+                                            .setRowConverter(dialect.getRowConverter(rowType))
+                                            .build();
+                            inputFormat.openInputFormat();
+                        })
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testNoQuery() throws IOException {
-        thrown.expect(NullPointerException.class);
-        thrown.expectMessage("No query supplied");
-        inputFormat =
-                JdbcRowDataInputFormat.builder()
-                        .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
-                        .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
-                        .setRowConverter(dialect.getRowConverter(rowType))
-                        .build();
+    void testNoQuery() {
+        assertThatThrownBy(
+                        () -> {
+                            inputFormat =
+                                    JdbcRowDataInputFormat.builder()
+                                            .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+                                            .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+                                            .setRowConverter(dialect.getRowConverter(rowType))
+                                            .build();
+                        })
+                .isInstanceOf(NullPointerException.class)
+                .hasMessage("No query supplied");
     }
 
     @Test
-    public void testNoUrl() throws IOException {
-        thrown.expect(NullPointerException.class);
-        thrown.expectMessage("jdbc url is empty");
-        inputFormat =
-                JdbcRowDataInputFormat.builder()
-                        .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
-                        .setQuery(SELECT_ALL_BOOKS)
-                        .setRowConverter(dialect.getRowConverter(rowType))
-                        .build();
+    void testNoUrl() {
+        assertThatThrownBy(
+                        () -> {
+                            inputFormat =
+                                    JdbcRowDataInputFormat.builder()
+                                            .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+                                            .setQuery(SELECT_ALL_BOOKS)
+                                            .setRowConverter(dialect.getRowConverter(rowType))
+                                            .build();
+                        })
+                .isInstanceOf(NullPointerException.class)
+                .hasMessage("jdbc url is empty");
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testInvalidFetchSize() {
-        inputFormat =
-                JdbcRowDataInputFormat.builder()
-                        .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
-                        .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
-                        .setQuery(SELECT_ALL_BOOKS)
-                        .setFetchSize(-7)
-                        .build();
+    @Test
+    void testInvalidFetchSize() {
+        assertThatThrownBy(
+                        () -> {
+                            inputFormat =
+                                    JdbcRowDataInputFormat.builder()
+                                            .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+                                            .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+                                            .setQuery(SELECT_ALL_BOOKS)
+                                            .setFetchSize(-7)
+                                            .build();
+                        })
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    public void testValidFetchSizeIntegerMin() {
+    void testValidFetchSizeIntegerMin() {
         inputFormat =
                 JdbcRowDataInputFormat.builder()
                         .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -187,7 +208,7 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testJdbcInputFormatWithoutParallelism() throws IOException {
+    void testJdbcInputFormatWithoutParallelism() throws IOException {
         inputFormat =
                 JdbcRowDataInputFormat.builder()
                         .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -215,7 +236,7 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testJdbcInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
+    void testJdbcInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
         final int fetchSize = 1;
         final long min = TEST_DATA[0].id;
         final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
@@ -253,8 +274,7 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testJdbcInputFormatWithoutParallelismAndNumericColumnSplitting()
-            throws IOException {
+    void testJdbcInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException {
         final long min = TEST_DATA[0].id;
         final long max = TEST_DATA[TEST_DATA.length - 1].id;
         final long fetchSize = max + 1; // generate a single split
@@ -292,7 +312,7 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testJdbcInputFormatWithParallelismAndGenericSplitting() throws IOException {
+    void testJdbcInputFormatWithParallelismAndGenericSplitting() throws IOException {
         Serializable[][] queryParameters = new String[2][1];
         queryParameters[0] = new String[] {TEST_DATA[3].author};
         queryParameters[1] = new String[] {TEST_DATA[0].author};
@@ -339,7 +359,7 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
     }
 
     @Test
-    public void testEmptyResults() throws IOException {
+    void testEmptyResults() throws IOException {
         inputFormat =
                 JdbcRowDataInputFormat.builder()
                         .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
index c6cd608..72b083e 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
@@ -41,7 +41,7 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test suite for {@link JdbcRowDataLookupFunction}. */
-public class JdbcRowDataLookupFunctionTest extends JdbcLookupTestBase {
+class JdbcRowDataLookupFunctionTest extends JdbcLookupTestBase {
 
     private static final String[] fieldNames = new String[] {"id1", "id2", "comment1", "comment2"};
     private static final DataType[] fieldDataTypes =
@@ -53,7 +53,7 @@ public class JdbcRowDataLookupFunctionTest extends JdbcLookupTestBase {
 
     @ParameterizedTest(name = "withFailure = {0}")
     @ValueSource(booleans = {false, true})
-    public void testLookup(boolean withFailure) throws Exception {
+    void testLookup(boolean withFailure) throws Exception {
         JdbcRowDataLookupFunction lookupFunction = buildRowDataLookupFunction(withFailure);
 
         ListOutputCollector collector = new ListOutputCollector();
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
index adac5b2..055e028 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
 
 /** Plan tests for JDBC connector, for example, testing projection push down. */
 public class JdbcTablePlanTest extends TableTestBase {
-
+    // TODO: Update to junit5 after TableTestBase migrated (maybe copy the class?)
     private final StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
 
     @Before
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
index d62da01..af03459 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
@@ -29,12 +29,13 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
 
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
 import org.testcontainers.utility.DockerImageName;
 
 import java.math.BigDecimal;
@@ -56,7 +57,8 @@ import static org.assertj.core.api.Assertions.assertThat;
  * Test unsigned type conversion between Flink and JDBC driver mysql, the test underlying use MySQL
  * to mock a DB.
  */
-public class UnsignedTypeConversionITCase extends AbstractTestBase {
+@Testcontainers
+class UnsignedTypeConversionITCase extends AbstractTestBase {
 
     private static final Logger LOGGER =
             LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
@@ -96,8 +98,8 @@ public class UnsignedTypeConversionITCase extends AbstractTestBase {
                 new BigDecimal("18446744073709551615")
             };
 
-    @ClassRule
-    public static final MySQLContainer<?> MYSQL_CONTAINER =
+    @Container
+    static final MySQLContainer<?> MYSQL_CONTAINER =
             new MySQLContainer<>(MYSQL_57_IMAGE)
                     .withEnv(DEFAULT_CONTAINER_ENV_MAP)
                     .withUsername(USER)
@@ -106,7 +108,7 @@ public class UnsignedTypeConversionITCase extends AbstractTestBase {
                     .withLogConsumer(new Slf4jLogConsumer(LOGGER));
 
     @Test
-    public void testUnsignedType() throws Exception {
+    void testUnsignedType() throws Exception {
         try (Connection con =
                 DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(), USER, PASSWORD)) {
             StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtilTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtilTest.java
index cf5ef28..b4c6720 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtilTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtilTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.jdbc.utils;
 
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.sql.Types;
 
@@ -29,10 +29,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Testing the type conversions from Flink to SQL types. */
-public class JdbcTypeUtilTest {
+class JdbcTypeUtilTest {
 
     @Test
-    public void testTypeConversions() {
+    void testTypeConversions() {
         assertThat(logicalTypeToSqlType(LogicalTypeRoot.INTEGER)).isEqualTo(Types.INTEGER);
         testUnsupportedType(LogicalTypeRoot.RAW);
         testUnsupportedType(LogicalTypeRoot.MAP);
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
index 6da62e4..5c5867b 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
@@ -42,18 +42,18 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.LogLevelRule;
 import org.apache.flink.util.function.SerializableSupplier;
 
 import com.mysql.cj.jdbc.MysqlXADataSource;
 import oracle.jdbc.xa.client.OracleXADataSource;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.postgresql.xa.PGXADataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,10 +92,9 @@ import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingO
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.slf4j.event.Level.TRACE;
 
 /** A simple end-to-end test for {@link JdbcXaSinkFunction}. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
     private static final Random RANDOM = new Random(System.currentTimeMillis());
 
@@ -104,14 +103,6 @@ public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
     private static final long CHECKPOINT_TIMEOUT_MS = 20_000L;
     private static final long TASK_CANCELLATION_TIMEOUT_MS = 20_000L;
 
-    // todo: remove after fixing FLINK-22889
-    @ClassRule
-    public static final LogLevelRule TEST_LOG_LEVEL_RULE =
-            new LogLevelRule()
-                    .set(JdbcExactlyOnceSinkE2eTest.class, TRACE)
-                    .set(XaFacadeImpl.class, TRACE)
-                    .set(MySqlJdbcExactlyOnceSinkTestEnv.InnoDbStatusLogger.class, TRACE);
-
     private interface JdbcExactlyOnceSinkTestEnv {
         void start();
 
@@ -124,7 +115,7 @@ public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
         int getParallelism();
     }
 
-    @Parameterized.Parameter public JdbcExactlyOnceSinkTestEnv dbEnv;
+    @Parameter public JdbcExactlyOnceSinkTestEnv dbEnv;
 
     private MiniClusterWithClientResource cluster;
 
@@ -138,7 +129,7 @@ public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
     // not using SharedObjects because we want to explicitly control which tag (attempt) to use
     private static final Map<Integer, CountDownLatch> inactiveMappers = new ConcurrentHashMap<>();
 
-    @Parameterized.Parameters(name = "{0}")
+    @Parameters(name = "{0}")
     public static Collection<JdbcExactlyOnceSinkTestEnv> parameters() {
         return Arrays.asList(
                 // PGSQL: check for issues with suspending connections (requires pooling) and
@@ -154,7 +145,7 @@ public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
                 );
     }
 
-    @Before
+    @BeforeEach
     public void before() throws Exception {
         Configuration configuration = new Configuration();
         // single failover region to allow checkpointing even after some sources have finished and
@@ -177,7 +168,7 @@ public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
         super.before();
     }
 
-    @After
+    @AfterEach
     @Override
     public void after() {
         // no need for cleanup - done by test container tear down
@@ -190,8 +181,8 @@ public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
         inactiveMappers.clear();
     }
 
-    @Test
-    public void testInsert() throws Exception {
+    @TestTemplate
+    void testInsert() throws Exception {
         long started = System.currentTimeMillis();
         LOG.info("Test insert for {}", dbEnv);
         int elementsPerSource = 50;
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java
index 18e05e2..3ba4aba 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java
@@ -21,7 +21,7 @@ import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcTestBase;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import javax.sql.XAConnection;
 import javax.sql.XADataSource;
@@ -37,7 +37,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** {@link XaFacadeImpl} tests. */
-public class JdbcXaFacadeImplTest extends JdbcTestBase {
+class JdbcXaFacadeImplTest extends JdbcTestBase {
 
     private static final Xid XID =
             new Xid() {
@@ -58,7 +58,7 @@ public class JdbcXaFacadeImplTest extends JdbcTestBase {
             };
 
     @Test
-    public void testRecover() throws Exception {
+    void testRecover() throws Exception {
         try (XaFacade f = XaFacadeImpl.fromXaDataSource(getDbMetadata().buildXaDataSource())) {
             f.open();
             assertThat(f.recover()).isEmpty();
@@ -80,7 +80,7 @@ public class JdbcXaFacadeImplTest extends JdbcTestBase {
     }
 
     @Test
-    public void testClose() throws Exception {
+    void testClose() throws Exception {
         // some drivers (derby, H2) close both connection on either
         // connection.close/xaConnection.close() call, so use mocks here to:
         // a) prevent closing XA connection from connection.close()
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java
index 39eef2c..c642b1f 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java
@@ -21,7 +21,7 @@ import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
 
 import org.apache.derby.jdbc.EmbeddedXADataSource;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -31,14 +31,14 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
  * {@link JdbcXaSinkFunction} tests using Derby DB. Derby supports XA but doesn't use MVCC, so we
  * can't check anything before all transactions are completed.
  */
-public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
+class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
 
     /**
      * checkpoint > capture state > emit > snapshot > close > init(captured state), open > emit >
      * checkpoint.
      */
     @Test
-    public void noDuplication() throws Exception {
+    void noDuplication() throws Exception {
         sinkHelper.notifyCheckpointComplete(0);
         TestXaSinkStateHandler newState = new TestXaSinkStateHandler();
         newState.store(sinkHelper.getState().load(null));
@@ -50,7 +50,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
     }
 
     @Test
-    public void testTxEndedOnClose() throws Exception {
+    void testTxEndedOnClose() throws Exception {
         sinkHelper.emit(
                 TEST_DATA[0]); // don't snapshotState to prevent transaction from being prepared
         sinkHelper.close();
@@ -58,7 +58,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
     }
 
     @Test
-    public void testTxRollbackOnStartup() throws Exception {
+    void testTxRollbackOnStartup() throws Exception {
         sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
         xaHelper.assertPreparedTxCountEquals(1);
         sinkHelper.close();
@@ -73,7 +73,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
     }
 
     @Test
-    public void testRestoreWithNotificationMissing() throws Exception {
+    void testRestoreWithNotificationMissing() throws Exception {
         sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
         sinkHelper.close();
         sinkHelper = buildSinkHelper(sinkHelper.getState());
@@ -82,7 +82,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
     }
 
     @Test
-    public void testCommitUponStart() throws Exception {
+    void testCommitUponStart() throws Exception {
         sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
         sinkHelper.close();
         buildAndInit(0, XaFacadeImpl.fromXaDataSource(xaDataSource), sinkHelper.getState());
@@ -91,12 +91,12 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
 
     /** RM may return {@link javax.transaction.xa.XAResource#XA_RDONLY XA_RDONLY} error. */
     @Test
-    public void testEmptyCheckpoint() throws Exception {
+    void testEmptyCheckpoint() throws Exception {
         sinkHelper.snapshotState(0);
     }
 
     @Test
-    public void testTwoCheckpointsComplete1st() throws Exception {
+    void testTwoCheckpointsComplete1st() throws Exception {
         sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
         sinkHelper.emitAndSnapshot(JdbcTestFixture.CP1);
         JdbcXaSinkTestHelper sinkHelper = this.sinkHelper;
@@ -107,7 +107,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
     }
 
     @Test
-    public void testTwoCheckpointsComplete2nd() throws Exception {
+    void testTwoCheckpointsComplete2nd() throws Exception {
         sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
         sinkHelper.emitAndCheckpoint(JdbcTestFixture.CP1);
         xaHelper.assertDbContentsEquals(
@@ -117,7 +117,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
     }
 
     @Test
-    public void testTwoCheckpointsCompleteBoth() throws Exception {
+    void testTwoCheckpointsCompleteBoth() throws Exception {
         sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
         sinkHelper.emitAndSnapshot(JdbcTestFixture.CP1);
         sinkHelper.notifyCheckpointComplete(JdbcTestFixture.CP0.id);
@@ -129,7 +129,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
     }
 
     @Test
-    public void testTwoCheckpointsCompleteBothOutOfOrder() throws Exception {
+    void testTwoCheckpointsCompleteBothOutOfOrder() throws Exception {
         sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
         sinkHelper.emitAndSnapshot(JdbcTestFixture.CP1);
         sinkHelper.notifyCheckpointComplete(JdbcTestFixture.CP1.id);
@@ -141,7 +141,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
     }
 
     @Test
-    public void testRestore() throws Exception {
+    void testRestore() throws Exception {
         sinkHelper.emitAndCheckpoint(JdbcTestFixture.CP0);
         sinkHelper.close();
         sinkHelper = new JdbcXaSinkTestHelper(buildAndInit(), new TestXaSinkStateHandler());
@@ -150,7 +150,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
     }
 
     @Test
-    public void testFailurePropagation() throws Exception {
+    void testFailurePropagation() throws Exception {
         /* big enough flush interval to cause error in snapshotState rather than in invoke*/
         sinkHelper =
                 new JdbcXaSinkTestHelper(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java
index c869026..b9abcba 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.jdbc.xa;
 import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -30,22 +30,22 @@ import static org.assertj.core.api.Assertions.assertThat;
  * transaction is not yet committed). But XA support isn't full, so for some scenarios {@link
  * org.apache.flink.connector.jdbc.xa.h2.H2XaDsWrapper wrapper} is used, and for some - Derby.
  */
-public class JdbcXaSinkH2Test extends JdbcXaSinkTestBase {
+class JdbcXaSinkH2Test extends JdbcXaSinkTestBase {
 
     @Test
-    public void testIgnoreDuplicatedNotification() throws Exception {
+    void testIgnoreDuplicatedNotification() throws Exception {
         sinkHelper.emitAndCheckpoint(JdbcTestFixture.CP0);
         sinkHelper.notifyCheckpointComplete(JdbcTestFixture.CP0.id);
     }
 
     /** RM may return {@link javax.transaction.xa.XAResource#XA_RDONLY XA_RDONLY} error. */
     @Test
-    public void testEmptyCheckpoint() throws Exception {
+    void testEmptyCheckpoint() throws Exception {
         sinkHelper.snapshotState(0);
     }
 
     @Test
-    public void testHappyFlow() throws Exception {
+    void testHappyFlow() throws Exception {
         sinkHelper.emit(TEST_DATA[0]);
         assertThat(xaHelper.countInDb())
                 .as("record should not be inserted before the checkpoint started.")
@@ -63,7 +63,7 @@ public class JdbcXaSinkH2Test extends JdbcXaSinkTestBase {
     }
 
     @Test
-    public void testTwoCheckpointsWithoutData() throws Exception {
+    void testTwoCheckpointsWithoutData() throws Exception {
         JdbcXaSinkTestHelper sinkHelper = this.sinkHelper;
         sinkHelper.snapshotState(1);
         sinkHelper.snapshotState(2);
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
index 3466e85..04e3576 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
@@ -27,12 +27,14 @@ import org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.Preconditions;
 
-import org.junit.After;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import javax.transaction.xa.Xid;
 
@@ -54,7 +56,7 @@ import static org.apache.flink.streaming.util.OperatorSnapshotUtil.readStateHand
 import static org.apache.flink.streaming.util.OperatorSnapshotUtil.writeStateHandle;
 
 /** Tests state migration for {@link JdbcXaSinkFunction}. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class JdbcXaSinkMigrationTest extends JdbcTestBase {
 
     // write a snapshot:
@@ -66,7 +68,7 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase {
         writeSnapshot(parseVersionArg(args));
     }
 
-    @Parameterized.Parameters
+    @Parameters
     public static Collection<FlinkVersion> getReadVersions() {
         return Collections.emptyList();
     }
@@ -77,8 +79,9 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase {
 
     private final FlinkVersion readVersion;
 
-    @Test
-    public void testCommitFromSnapshot() throws Exception {
+    @TestTemplate
+    @Disabled // as getReadVersions is empty and fails
+    void testCommitFromSnapshot() throws Exception {
         preparePendingTransaction();
         try (OneInputStreamOperatorTestHarness<TestEntry, Object> harness =
                 createHarness(buildSink())) {
@@ -96,8 +99,8 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase {
         }
     }
 
-    @After
-    public void cleanUp() throws Exception {
+    @AfterEach
+    void cleanUp() throws Exception {
         cancelAllTx();
     }
 
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java
index 4baf319..f1e4751 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java
@@ -20,16 +20,16 @@ package org.apache.flink.connector.jdbc.xa;
 import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests that data is not inserted ahead of time. */
-public class JdbcXaSinkNoInsertionTest extends JdbcXaSinkTestBase {
+class JdbcXaSinkNoInsertionTest extends JdbcXaSinkTestBase {
 
     @Test
-    public void testNoInsertAfterInvoke() throws Exception {
+    void testNoInsertAfterInvoke() throws Exception {
         sinkHelper.emit(TEST_DATA[0]);
         assertThat(xaHelper.countInDb())
                 .as("no records should be inserted for incomplete checkpoints.")
@@ -37,7 +37,7 @@ public class JdbcXaSinkNoInsertionTest extends JdbcXaSinkTestBase {
     }
 
     @Test
-    public void testNoInsertAfterSnapshot() throws Exception {
+    void testNoInsertAfterSnapshot() throws Exception {
         sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
         assertThat(xaHelper.countInDb())
                 .as("no records should be inserted for incomplete checkpoints.")
@@ -52,7 +52,7 @@ public class JdbcXaSinkNoInsertionTest extends JdbcXaSinkTestBase {
     }
 
     @Test
-    public void testNoInsertAfterFacadeClose() throws Exception {
+    void testNoInsertAfterFacadeClose() throws Exception {
         try (XaFacadeImpl xaFacade = XaFacadeImpl.fromXaDataSource(xaDataSource)) {
             sinkHelper =
                     new JdbcXaSinkTestHelper(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
index efaf4b2..3a688aa 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
@@ -52,8 +52,8 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 
-import org.junit.After;
-import org.junit.Before;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 
 import javax.sql.XADataSource;
 import javax.transaction.xa.Xid;
@@ -75,14 +75,14 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.INSERT_TEMPLATE;
  * // todo: javadoc case Base class for {@link JdbcXaSinkFunction} tests. In addition to {@link
  * JdbcTestBase} init it initializes/closes helpers.
  */
-public abstract class JdbcXaSinkTestBase extends JdbcTestBase {
+abstract class JdbcXaSinkTestBase extends JdbcTestBase {
 
     JdbcXaFacadeTestHelper xaHelper;
     JdbcXaSinkTestHelper sinkHelper;
     XADataSource xaDataSource;
 
-    @Before
-    public void initHelpers() throws Exception {
+    @BeforeEach
+    void initHelpers() throws Exception {
         xaDataSource = getDbMetadata().buildXaDataSource();
         xaHelper =
                 new JdbcXaFacadeTestHelper(
@@ -98,8 +98,8 @@ public abstract class JdbcXaSinkTestBase extends JdbcTestBase {
         return new TestXaSinkStateHandler();
     }
 
-    @After
-    public void closeHelpers() throws Exception {
+    @AfterEach
+    void closeHelpers() throws Exception {
         if (sinkHelper != null) {
             sinkHelper.close();
         }
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java
index 63fe55c..c105a91 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java
@@ -19,7 +19,7 @@ package org.apache.flink.connector.jdbc.xa;
 
 import org.apache.flink.api.common.JobID;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import javax.transaction.xa.Xid;
 
@@ -31,18 +31,18 @@ import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkTestBase.TEST_RUNTIME
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Simple uniqueness tests for the {@link SemanticXidGenerator}. */
-public class SemanticXidGeneratorTest {
+class SemanticXidGeneratorTest {
     private static final int COUNT = 100_000;
 
     @Test
-    public void testXidsUniqueAmongCheckpoints() {
+    void testXidsUniqueAmongCheckpoints() {
         SemanticXidGenerator xidGenerator = new SemanticXidGenerator();
         xidGenerator.open();
         checkUniqueness(checkpoint -> xidGenerator.generateXid(TEST_RUNTIME_CONTEXT, checkpoint));
     }
 
     @Test
-    public void testXidsUniqueAmongJobs() {
+    void testXidsUniqueAmongJobs() {
         long checkpointId = 1L;
         SemanticXidGenerator generator = new SemanticXidGenerator();
         checkUniqueness(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/XidImplTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/XidImplTest.java
index 33a2053..58ca865 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/XidImplTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/XidImplTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.connector.jdbc.xa;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.Random;
 
@@ -26,11 +26,11 @@ import static javax.transaction.xa.Xid.MAXGTRIDSIZE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link XidImpl}. */
-public class XidImplTest {
+class XidImplTest {
     static final XidImpl XID = new XidImpl(1, randomBytes(MAXGTRIDSIZE), randomBytes(MAXBQUALSIZE));
 
     @Test
-    public void testXidsEqual() {
+    void testXidsEqual() {
         XidImpl other =
                 new XidImpl(
                         XID.getFormatId(), XID.getGlobalTransactionId(), XID.getBranchQualifier());
@@ -38,7 +38,7 @@ public class XidImplTest {
     }
 
     @Test
-    public void testXidsNotEqual() {
+    void testXidsNotEqual() {
         assertThat(new XidImpl(0, XID.getGlobalTransactionId(), XID.getBranchQualifier()))
                 .isNotEqualTo(XID);
         assertThat(