You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/12/23 03:05:03 UTC
[flink-connector-jdbc] branch main updated: [FLINK-27940][tests] Migrate tests to junit5
This is an automated email from the ASF dual-hosted git repository.
wanglijie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new 5ec3873 [FLINK-27940][tests] Migrate tests to junit5
5ec3873 is described below
commit 5ec38735e5ac2b12620e91ce4977b805fb04c5b8
Author: Joao Boto <bo...@boto.pro>
AuthorDate: Fri Dec 23 04:04:58 2022 +0100
[FLINK-27940][tests] Migrate tests to junit5
This closes #12
---
.../connector/jdbc/JdbcConnectionOptionsTest.java | 48 ++---
.../flink/connector/jdbc/JdbcDataTestBase.java | 6 +-
.../flink/connector/jdbc/JdbcDataTypeTest.java | 18 +-
.../apache/flink/connector/jdbc/JdbcITCase.java | 6 +-
.../flink/connector/jdbc/JdbcInputFormatTest.java | 197 ++++++++++++---------
.../connector/jdbc/JdbcRowOutputFormatTest.java | 45 +++--
.../apache/flink/connector/jdbc/JdbcTestBase.java | 8 +-
.../jdbc/catalog/JdbcCatalogUtilsTest.java | 20 ++-
.../connector/jdbc/catalog/MySqlCatalogITCase.java | 86 ++++-----
.../jdbc/catalog/MySqlCatalogTestBase.java | 16 +-
.../jdbc/catalog/PostgresCatalogITCase.java | 28 +--
.../jdbc/catalog/PostgresCatalogTest.java | 76 ++++----
.../jdbc/catalog/PostgresCatalogTestBase.java | 20 +--
.../jdbc/catalog/PostgresTablePathTest.java | 6 +-
.../catalog/factory/JdbcCatalogFactoryTest.java | 20 ++-
.../converter/AbstractJdbcRowConverterTest.java | 6 +-
.../jdbc/dialect/mysql/MySqlDialectTest.java | 2 +-
.../oracle/OraclePreparedStatementTest.java | 16 +-
.../jdbc/dialect/oracle/OracleTableSinkITCase.java | 28 +--
.../dialect/oracle/OracleTableSourceITCase.java | 28 +--
.../sqlserver/SqlServerTableSinkITCase.java | 36 ++--
.../sqlserver/SqlServerTableSourceITCase.java | 28 +--
.../connector/jdbc/internal/JdbcFullTest.java | 16 +-
.../jdbc/internal/JdbcTableOutputFormatTest.java | 20 +--
...ProviderDriverClassConcurrentLoadingITCase.java | 11 +-
.../SimpleJdbcConnectionProviderTest.java | 20 +--
.../NumericBetweenParametersProviderTest.java | 16 +-
.../FieldNamedPreparedStatementImplTest.java | 16 +-
.../jdbc/table/JdbcAppendOnlyWriterTest.java | 81 +++++----
.../jdbc/table/JdbcDynamicTableFactoryTest.java | 20 +--
.../jdbc/table/JdbcDynamicTableSinkITCase.java | 28 +--
.../jdbc/table/JdbcDynamicTableSourceITCase.java | 2 +-
...FilterPushdownPreparedStatementVisitorTest.java | 35 ++--
.../connector/jdbc/table/JdbcLookupTestBase.java | 6 +-
.../connector/jdbc/table/JdbcOutputFormatTest.java | 32 ++--
.../jdbc/table/JdbcRowDataInputFormatTest.java | 186 ++++++++++---------
.../jdbc/table/JdbcRowDataLookupFunctionTest.java | 4 +-
.../connector/jdbc/table/JdbcTablePlanTest.java | 2 +-
.../jdbc/table/UnsignedTypeConversionITCase.java | 14 +-
.../connector/jdbc/utils/JdbcTypeUtilTest.java | 6 +-
.../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java | 37 ++--
.../connector/jdbc/xa/JdbcXaFacadeImplTest.java | 8 +-
.../connector/jdbc/xa/JdbcXaSinkDerbyTest.java | 28 +--
.../flink/connector/jdbc/xa/JdbcXaSinkH2Test.java | 12 +-
.../connector/jdbc/xa/JdbcXaSinkMigrationTest.java | 23 +--
.../jdbc/xa/JdbcXaSinkNoInsertionTest.java | 10 +-
.../connector/jdbc/xa/JdbcXaSinkTestBase.java | 14 +-
.../jdbc/xa/SemanticXidGeneratorTest.java | 8 +-
.../flink/connector/jdbc/xa/XidImplTest.java | 8 +-
49 files changed, 740 insertions(+), 667 deletions(-)
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcConnectionOptionsTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcConnectionOptionsTest.java
index 7419c98..985f05a 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcConnectionOptionsTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcConnectionOptionsTest.java
@@ -19,35 +19,43 @@ package org.apache.flink.connector.jdbc;
import org.apache.flink.connector.jdbc.fakedb.FakeDBUtils;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link JdbcConnectionOptions}. */
-public class JdbcConnectionOptionsTest {
- @Test(expected = NullPointerException.class)
- public void testNullUrl() {
- new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
- .withUrl(null)
- .withUsername("user")
- .withPassword("password")
- .withDriverName(FakeDBUtils.DRIVER1_CLASS_NAME)
- .build();
+class JdbcConnectionOptionsTest {
+ @Test
+ void testNullUrl() {
+ assertThatThrownBy(
+ () ->
+ new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+ .withUrl(null)
+ .withUsername("user")
+ .withPassword("password")
+ .withDriverName(FakeDBUtils.DRIVER1_CLASS_NAME)
+ .build())
+ .isInstanceOf(NullPointerException.class);
}
@Test
- public void testNoOptionalOptions() {
+ void testNoOptionalOptions() {
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(FakeDBUtils.TEST_DB_URL)
.build();
}
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidCheckTimeoutSeconds() {
- new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
- .withUrl(FakeDBUtils.TEST_DB_URL)
- .withUsername("user")
- .withPassword("password")
- .withDriverName(FakeDBUtils.DRIVER1_CLASS_NAME)
- .withConnectionCheckTimeoutSeconds(0)
- .build();
+ @Test
+ void testInvalidCheckTimeoutSeconds() {
+ assertThatThrownBy(
+ () ->
+ new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+ .withUrl(FakeDBUtils.TEST_DB_URL)
+ .withUsername("user")
+ .withPassword("password")
+ .withDriverName(FakeDBUtils.DRIVER1_CLASS_NAME)
+ .withConnectionCheckTimeoutSeconds(0)
+ .build())
+ .isInstanceOf(IllegalArgumentException.class);
}
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
index 2eb0de7..003cf57 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.types.Row;
-import org.junit.Before;
+import org.junit.jupiter.api.BeforeEach;
import org.mockito.Mockito;
import java.sql.SQLException;
@@ -38,8 +38,8 @@ import static org.mockito.Mockito.doReturn;
* and inserts data before each test.
*/
public abstract class JdbcDataTestBase extends JdbcTestBase {
- @Before
- public void initData() throws SQLException {
+ @BeforeEach
+ void initData() throws SQLException {
JdbcTestFixture.initData(getDbMetadata());
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java
index d3c35c5..75be19a 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java
@@ -20,10 +20,12 @@ package org.apache.flink.connector.jdbc;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
import javax.annotation.Nullable;
@@ -35,7 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for all DataTypes and Dialects of JDBC connector. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
public class JdbcDataTypeTest {
private static final String DDL_FORMAT =
@@ -49,7 +51,7 @@ public class JdbcDataTypeTest {
+ " 'table-name'='myTable'\n"
+ ")";
- @Parameterized.Parameters(name = "{index}: {0}")
+ @Parameters(name = "{0}")
public static List<TestItem> testData() {
return Arrays.asList(
createTestItem("derby", "CHAR"),
@@ -178,10 +180,10 @@ public class JdbcDataTypeTest {
return item;
}
- @Parameterized.Parameter public TestItem testItem;
+ @Parameter public TestItem testItem;
- @Test
- public void testDataTypeValidate() {
+ @TestTemplate
+ void testDataTypeValidate() {
String sqlDDL = String.format(DDL_FORMAT, testItem.dataTypeExpr, testItem.dialect);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
index c24fc56..d1a830b 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.function.FunctionWithException;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.Serializable;
import java.sql.Connection;
@@ -63,7 +63,7 @@ public class JdbcITCase extends JdbcTestBase {
};
@Test
- public void testInsert() throws Exception {
+ void testInsert() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
env.setParallelism(1);
@@ -82,7 +82,7 @@ public class JdbcITCase extends JdbcTestBase {
}
@Test
- public void testObjectReuse() throws Exception {
+ void testObjectReuse() throws Exception {
Configuration configuration = new Configuration();
configuration.set(OBJECT_REUSE, true);
StreamExecutionEnvironment env =
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
index 197fb63..92c0e79 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcInputFormatTest.java
@@ -24,10 +24,8 @@ import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.types.Row;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.Serializable;
@@ -44,16 +42,15 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.SELECT_EMPTY;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link JdbcInputFormat}. */
-public class JdbcInputFormatTest extends JdbcDataTestBase {
-
- @Rule public ExpectedException thrown = ExpectedException.none();
+class JdbcInputFormatTest extends JdbcDataTestBase {
private JdbcInputFormat jdbcInputFormat;
- @After
- public void tearDown() throws IOException {
+ @AfterEach
+ void tearDown() throws IOException {
if (jdbcInputFormat != null) {
jdbcInputFormat.close();
jdbcInputFormat.closeInputFormat();
@@ -62,92 +59,117 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testUntypedRowInfo() throws IOException {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("No RowTypeInfo supplied");
- jdbcInputFormat =
- JdbcInputFormat.buildJdbcInputFormat()
- .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
- .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
- .setQuery(SELECT_ALL_BOOKS)
- .finish();
- jdbcInputFormat.openInputFormat();
+ void testUntypedRowInfo() {
+ assertThatThrownBy(
+ () -> {
+ jdbcInputFormat =
+ JdbcInputFormat.buildJdbcInputFormat()
+ .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+ .setQuery(SELECT_ALL_BOOKS)
+ .finish();
+ jdbcInputFormat.openInputFormat();
+ })
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("No RowTypeInfo supplied");
}
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidDriver() throws IOException {
- jdbcInputFormat =
- JdbcInputFormat.buildJdbcInputFormat()
- .setDrivername("org.apache.derby.jdbc.idontexist")
- .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
- .setQuery(SELECT_ALL_BOOKS)
- .setRowTypeInfo(ROW_TYPE_INFO)
- .finish();
- jdbcInputFormat.openInputFormat();
+ @Test
+ void testInvalidDriver() {
+ assertThatThrownBy(
+ () -> {
+ jdbcInputFormat =
+ JdbcInputFormat.buildJdbcInputFormat()
+ .setDrivername("org.apache.derby.jdbc.idontexist")
+ .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowTypeInfo(ROW_TYPE_INFO)
+ .finish();
+ jdbcInputFormat.openInputFormat();
+ })
+ .isInstanceOf(IllegalArgumentException.class);
}
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidURL() throws IOException {
- jdbcInputFormat =
- JdbcInputFormat.buildJdbcInputFormat()
- .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
- .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
- .setQuery(SELECT_ALL_BOOKS)
- .setRowTypeInfo(ROW_TYPE_INFO)
- .finish();
- jdbcInputFormat.openInputFormat();
+ @Test
+ void testInvalidURL() {
+ assertThatThrownBy(
+ () -> {
+ jdbcInputFormat =
+ JdbcInputFormat.buildJdbcInputFormat()
+ .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowTypeInfo(ROW_TYPE_INFO)
+ .finish();
+ jdbcInputFormat.openInputFormat();
+ })
+ .isInstanceOf(IllegalArgumentException.class);
}
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidQuery() throws IOException {
- jdbcInputFormat =
- JdbcInputFormat.buildJdbcInputFormat()
- .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
- .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
- .setQuery("iamnotsql")
- .setRowTypeInfo(ROW_TYPE_INFO)
- .finish();
- jdbcInputFormat.openInputFormat();
+ @Test
+ void testInvalidQuery() {
+ assertThatThrownBy(
+ () -> {
+ jdbcInputFormat =
+ JdbcInputFormat.buildJdbcInputFormat()
+ .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+ .setQuery("iamnotsql")
+ .setRowTypeInfo(ROW_TYPE_INFO)
+ .finish();
+ jdbcInputFormat.openInputFormat();
+ })
+ .isInstanceOf(IllegalArgumentException.class);
}
@Test
- public void testNoUrl() throws IOException {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("jdbc url is empty");
- jdbcInputFormat =
- JdbcInputFormat.buildJdbcInputFormat()
- .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
- .setQuery(SELECT_ALL_BOOKS)
- .setRowTypeInfo(ROW_TYPE_INFO)
- .finish();
+ void testNoUrl() {
+ assertThatThrownBy(
+ () -> {
+ jdbcInputFormat =
+ JdbcInputFormat.buildJdbcInputFormat()
+ .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowTypeInfo(ROW_TYPE_INFO)
+ .finish();
+ })
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("jdbc url is empty");
}
@Test
- public void testNoQuery() throws IOException {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("No query supplied");
- jdbcInputFormat =
- JdbcInputFormat.buildJdbcInputFormat()
- .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
- .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
- .setRowTypeInfo(ROW_TYPE_INFO)
- .finish();
+ void testNoQuery() {
+ assertThatThrownBy(
+ () -> {
+ jdbcInputFormat =
+ JdbcInputFormat.buildJdbcInputFormat()
+ .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+ .setRowTypeInfo(ROW_TYPE_INFO)
+ .finish();
+ })
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("No query supplied");
}
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidFetchSize() {
- jdbcInputFormat =
- JdbcInputFormat.buildJdbcInputFormat()
- .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
- .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
- .setQuery(SELECT_ALL_BOOKS)
- .setRowTypeInfo(ROW_TYPE_INFO)
- .setFetchSize(-7)
- .finish();
+ @Test
+ void testInvalidFetchSize() {
+ assertThatThrownBy(
+ () -> {
+ jdbcInputFormat =
+ JdbcInputFormat.buildJdbcInputFormat()
+ .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowTypeInfo(ROW_TYPE_INFO)
+ .setFetchSize(-7)
+ .finish();
+ })
+ .isInstanceOf(IllegalArgumentException.class);
}
@Test
- public void testValidFetchSizeIntegerMin() {
+ void testValidFetchSizeIntegerMin() {
jdbcInputFormat =
JdbcInputFormat.buildJdbcInputFormat()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -159,7 +181,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise()
+ void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise()
throws SQLException, ClassNotFoundException {
jdbcInputFormat =
JdbcInputFormat.buildJdbcInputFormat()
@@ -180,7 +202,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testFetchSizeCanBeConfigured() throws SQLException {
+ void testFetchSizeCanBeConfigured() throws SQLException {
final int desiredFetchSize = 10_000;
jdbcInputFormat =
JdbcInputFormat.buildJdbcInputFormat()
@@ -195,7 +217,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testDefaultAutoCommitIsUsedIfNotConfiguredOtherwise()
+ void testDefaultAutoCommitIsUsedIfNotConfiguredOtherwise()
throws SQLException, ClassNotFoundException {
jdbcInputFormat =
@@ -215,7 +237,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testAutoCommitCanBeConfigured() throws SQLException {
+ void testAutoCommitCanBeConfigured() throws SQLException {
final boolean desiredAutoCommit = false;
jdbcInputFormat =
@@ -232,7 +254,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testJdbcInputFormatWithoutParallelism() throws IOException {
+ void testJdbcInputFormatWithoutParallelism() throws IOException {
jdbcInputFormat =
JdbcInputFormat.buildJdbcInputFormat()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -260,7 +282,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testJdbcInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
+ void testJdbcInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
final int fetchSize = 1;
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
@@ -298,8 +320,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testJdbcInputFormatWithoutParallelismAndNumericColumnSplitting()
- throws IOException {
+ void testJdbcInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException {
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - 1].id;
final long fetchSize = max + 1; // generate a single split
@@ -337,7 +358,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testJdbcInputFormatWithParallelismAndGenericSplitting() throws IOException {
+ void testJdbcInputFormatWithParallelismAndGenericSplitting() throws IOException {
Serializable[][] queryParameters = new String[2][1];
queryParameters[0] = new String[] {TEST_DATA[3].author};
queryParameters[1] = new String[] {TEST_DATA[0].author};
@@ -383,7 +404,7 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testEmptyResults() throws IOException {
+ void testEmptyResults() throws IOException {
jdbcInputFormat =
JdbcInputFormat.buildJdbcInputFormat()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
index 5f21362..3150a15 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcRowOutputFormatTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.connector.jdbc;
import org.apache.flink.types.Row;
-import org.junit.After;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.sql.Connection;
@@ -51,20 +51,26 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for the {@link JdbcRowOutputFormat}. */
-public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
+class JdbcRowOutputFormatTest extends JdbcDataTestBase {
private JdbcRowOutputFormat jdbcOutputFormat;
- @After
- public void tearDown() throws IOException {
+ @AfterEach
+ void tearDown() throws Exception {
if (jdbcOutputFormat != null) {
jdbcOutputFormat.close();
}
jdbcOutputFormat = null;
+
+ Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
+ try (Connection conn = DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl());
+ Statement stat = conn.createStatement()) {
+ stat.execute("DELETE FROM " + OUTPUT_TABLE);
+ }
}
@Test
- public void testInvalidDriver() {
+ void testInvalidDriver() {
String expectedMsg = "unable to open JDBC writer";
try {
jdbcOutputFormat =
@@ -81,7 +87,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testInvalidURL() {
+ void testInvalidURL() {
String expectedMsg = "No suitable driver found for jdbc:der:iamanerror:mory:ebookshop";
jdbcOutputFormat =
@@ -96,7 +102,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testInvalidQuery() {
+ void testInvalidQuery() {
String expectedMsg = "unable to open JDBC writer";
try {
jdbcOutputFormat =
@@ -114,7 +120,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testIncompleteConfiguration() {
+ void testIncompleteConfiguration() {
String expectedMsg = "jdbc url is empty";
try {
jdbcOutputFormat =
@@ -129,7 +135,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testIncompatibleTypes() {
+ void testIncompatibleTypes() {
String expectedMsg = "Invalid character string format for type INTEGER.";
try {
jdbcOutputFormat =
@@ -157,7 +163,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testExceptionOnInvalidType() {
+ void testExceptionOnInvalidType() {
String expectedMsg = "field index: 3, field value: 0.";
try {
jdbcOutputFormat =
@@ -193,7 +199,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testExceptionOnClose() {
+ void testExceptionOnClose() {
String expectedMsg = "Writing records to JDBC failed.";
try {
jdbcOutputFormat =
@@ -232,7 +238,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testJdbcOutputFormat() throws IOException, SQLException {
+ void testJdbcOutputFormat() throws IOException, SQLException {
jdbcOutputFormat =
JdbcRowOutputFormat.buildJdbcOutputFormat()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -266,7 +272,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testFlush() throws SQLException, IOException {
+ void testFlush() throws SQLException, IOException {
jdbcOutputFormat =
JdbcRowOutputFormat.buildJdbcOutputFormat()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -306,7 +312,7 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testInvalidConnectionInJdbcOutputFormat() throws IOException, SQLException {
+ void testInvalidConnectionInJdbcOutputFormat() throws IOException, SQLException {
jdbcOutputFormat =
JdbcRowOutputFormat.buildJdbcOutputFormat()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -346,13 +352,4 @@ public class JdbcRowOutputFormatTest extends JdbcDataTestBase {
assertThat(recordCount).isEqualTo(TEST_DATA.length);
}
}
-
- @After
- public void clearOutputTable() throws Exception {
- Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
- try (Connection conn = DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl());
- Statement stat = conn.createStatement()) {
- stat.execute("DELETE FROM " + OUTPUT_TABLE);
- }
- }
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
index 88daae4..1214a08 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
@@ -17,8 +17,8 @@
package org.apache.flink.connector.jdbc;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
/**
* Base class for JDBC test using DDL from {@link JdbcTestFixture}. It uses create tables before
@@ -26,12 +26,12 @@ import org.junit.Before;
*/
public abstract class JdbcTestBase {
- @Before
+ @BeforeEach
public void before() throws Exception {
JdbcTestFixture.initSchema(getDbMetadata());
}
- @After
+ @AfterEach
public void after() throws Exception {
JdbcTestFixture.cleanupData(getDbMetadata().getUrl());
JdbcTestFixture.cleanUpDatabasesStatic(getDbMetadata());
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtilsTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtilsTest.java
index 087009e..2340714 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtilsTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtilsTest.java
@@ -18,24 +18,26 @@
package org.apache.flink.connector.jdbc.catalog;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link JdbcCatalogUtils}. */
-public class JdbcCatalogUtilsTest {
- @Rule public ExpectedException exception = ExpectedException.none();
+class JdbcCatalogUtilsTest {
@Test
- public void testJdbcUrl() {
+ void testJdbcUrl() {
JdbcCatalogUtils.validateJdbcUrl("jdbc:postgresql://localhost:5432/");
JdbcCatalogUtils.validateJdbcUrl("jdbc:postgresql://localhost:5432");
}
@Test
- public void testInvalidJdbcUrl() {
- exception.expect(IllegalArgumentException.class);
- JdbcCatalogUtils.validateJdbcUrl("jdbc:postgresql://localhost:5432/db");
+ void testInvalidJdbcUrl() {
+ assertThatThrownBy(
+ () ->
+ JdbcCatalogUtils.validateJdbcUrl(
+ "jdbc:postgresql://localhost:5432/db"))
+ .isInstanceOf(IllegalArgumentException.class);
}
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
index c0b398e..9d57400 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
@@ -26,22 +26,24 @@ import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -51,7 +53,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** E2E test for {@link MySqlCatalog}. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
public class MySqlCatalogITCase extends MySqlCatalogTestBase {
private static final List<Row> ALL_TYPES_ROWS =
@@ -154,8 +156,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
private final MySqlCatalog catalog;
private TableEnvironment tEnv;
- @Before
- public void setup() {
+ @BeforeEach
+ void setup() {
this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
@@ -168,15 +170,15 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
catalog = CATALOGS.get(version);
}
- @Parameterized.Parameters(name = "version = {0}")
- public static String[] params() {
- return DOCKER_IMAGE_NAMES.toArray(new String[0]);
+ @Parameters(name = "version = {0}")
+ public static Collection<String> params() {
+ return DOCKER_IMAGE_NAMES;
}
// ------ databases ------
- @Test
- public void testGetDb_DatabaseNotExistException() throws Exception {
+ @TestTemplate
+ void testGetDb_DatabaseNotExistException() throws Exception {
String databaseNotExist = "nonexistent";
assertThatThrownBy(() -> catalog.getDatabase(databaseNotExist))
.satisfies(
@@ -187,14 +189,14 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
databaseNotExist)));
}
- @Test
- public void testListDatabases() {
+ @TestTemplate
+ void testListDatabases() {
List<String> actual = catalog.listDatabases();
assertThat(actual).containsExactly(TEST_DB, TEST_DB2);
}
- @Test
- public void testDbExists() throws Exception {
+ @TestTemplate
+ void testDbExists() throws Exception {
String databaseNotExist = "nonexistent";
assertThat(catalog.databaseExists(databaseNotExist)).isFalse();
assertThat(catalog.databaseExists(TEST_DB)).isTrue();
@@ -202,8 +204,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
// ------ tables ------
- @Test
- public void testListTables() throws DatabaseNotExistException {
+ @TestTemplate
+ void testListTables() throws DatabaseNotExistException {
List<String> actual = catalog.listTables(TEST_DB);
assertThat(actual)
.isEqualTo(
@@ -214,8 +216,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
TEST_TABLE_PK));
}
- @Test
- public void testListTables_DatabaseNotExistException() throws DatabaseNotExistException {
+ @TestTemplate
+ void testListTables_DatabaseNotExistException() throws DatabaseNotExistException {
String anyDatabase = "anyDatabase";
assertThatThrownBy(() -> catalog.listTables(anyDatabase))
.satisfies(
@@ -225,15 +227,15 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
"Database %s does not exist in Catalog", anyDatabase)));
}
- @Test
- public void testTableExists() {
+ @TestTemplate
+ void testTableExists() {
String tableNotExist = "nonexist";
assertThat(catalog.tableExists(new ObjectPath(TEST_DB, tableNotExist))).isFalse();
assertThat(catalog.tableExists(new ObjectPath(TEST_DB, TEST_TABLE_ALL_TYPES))).isTrue();
}
- @Test
- public void testGetTables_TableNotExistException() throws TableNotExistException {
+ @TestTemplate
+ void testGetTables_TableNotExistException() throws TableNotExistException {
String anyTableNotExist = "anyTable";
assertThatThrownBy(() -> catalog.getTable(new ObjectPath(TEST_DB, anyTableNotExist)))
.satisfies(
@@ -244,8 +246,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
TEST_DB, anyTableNotExist)));
}
- @Test
- public void testGetTables_TableNotExistException_NoDb() throws TableNotExistException {
+ @TestTemplate
+ void testGetTables_TableNotExistException_NoDb() throws TableNotExistException {
String databaseNotExist = "nonexistdb";
String tableNotExist = "anyTable";
assertThatThrownBy(() -> catalog.getTable(new ObjectPath(databaseNotExist, tableNotExist)))
@@ -257,14 +259,14 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
databaseNotExist, tableNotExist)));
}
- @Test
- public void testGetTable() throws TableNotExistException {
+ @TestTemplate
+ void testGetTable() throws TableNotExistException {
CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB, TEST_TABLE_ALL_TYPES));
assertThat(table.getUnresolvedSchema()).isEqualTo(TABLE_SCHEMA);
}
- @Test
- public void testGetTablePrimaryKey() throws TableNotExistException {
+ @TestTemplate
+ void testGetTablePrimaryKey() throws TableNotExistException {
// test the PK of test.t_user
Schema tableSchemaTestPK1 =
Schema.newBuilder()
@@ -290,8 +292,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
// ------ test select query. ------
- @Test
- public void testSelectField() {
+ @TestTemplate
+ void testSelectField() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select pid from %s", TEST_TABLE_ALL_TYPES))
@@ -303,8 +305,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 2L)));
}
- @Test
- public void testWithoutCatalogDB() {
+ @TestTemplate
+ void testWithoutCatalogDB() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TEST_TABLE_ALL_TYPES))
@@ -314,8 +316,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
assertThat(results).isEqualTo(ALL_TYPES_ROWS);
}
- @Test
- public void testWithoutCatalog() {
+ @TestTemplate
+ void testWithoutCatalog() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(
@@ -327,8 +329,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
assertThat(results).isEqualTo(ALL_TYPES_ROWS);
}
- @Test
- public void testFullPath() {
+ @TestTemplate
+ void testFullPath() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(
@@ -342,8 +344,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
assertThat(results).isEqualTo(ALL_TYPES_ROWS);
}
- @Test
- public void testSelectToInsert() throws Exception {
+ @TestTemplate
+ void testSelectToInsert() throws Exception {
String sql =
String.format(
@@ -359,8 +361,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
assertThat(results).isEqualTo(ALL_TYPES_ROWS);
}
- @Test
- public void testGroupByInsert() throws Exception {
+ @TestTemplate
+ void testGroupByInsert() throws Exception {
// Changes primary key for the next record.
tEnv.executeSql(
String.format(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
index d92c89e..c58dcf4 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
@@ -23,8 +23,8 @@ import org.apache.flink.table.api.Schema;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;
@@ -38,12 +38,12 @@ import java.util.List;
import java.util.Map;
/** Test base for {@link MySqlCatalog}. */
-public class MySqlCatalogTestBase {
+class MySqlCatalogTestBase {
public static final Logger LOG = LoggerFactory.getLogger(MySqlCatalogTestBase.class);
protected static final List<String> DOCKER_IMAGE_NAMES =
- Arrays.asList("mysql:5.6.51", "mysql:5.7.34", "mysql:8.0.16");
+ Arrays.asList("mysql:5.6.51", "mysql:5.7.40", "mysql:8.0.31");
protected static final String TEST_CATALOG_NAME = "mysql_catalog";
protected static final String TEST_USERNAME = "mysql";
protected static final String TEST_PWD = "mysql";
@@ -114,8 +114,8 @@ public class MySqlCatalogTestBase {
public static final Map<String, MySQLContainer<?>> MYSQL_CONTAINERS = new HashMap<>();
public static final Map<String, MySqlCatalog> CATALOGS = new HashMap<>();
- @BeforeClass
- public static void beforeAll() throws SQLException {
+ @BeforeAll
+ static void beforeAll() throws SQLException {
for (String dockerImageName : DOCKER_IMAGE_NAMES) {
MySQLContainer<?> container =
new MySQLContainer<>(DockerImageName.parse(dockerImageName))
@@ -140,8 +140,8 @@ public class MySqlCatalogTestBase {
}
}
- @AfterClass
- public static void cleanup() {
+ @AfterAll
+ static void cleanup() {
for (MySQLContainer<?> container : MYSQL_CONTAINERS.values()) {
container.stop();
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
index 78d142d..925f75d 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
@@ -23,8 +23,8 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.util.List;
@@ -33,12 +33,12 @@ import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
import static org.assertj.core.api.Assertions.assertThat;
/** E2E test for {@link PostgresCatalog}. */
-public class PostgresCatalogITCase extends PostgresCatalogTestBase {
+class PostgresCatalogITCase extends PostgresCatalogTestBase {
private TableEnvironment tEnv;
- @Before
- public void setup() {
+ @BeforeEach
+ void setup() {
this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
@@ -48,7 +48,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
}
@Test
- public void testSelectField() {
+ void testSelectField() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select id from %s", TABLE1))
@@ -58,7 +58,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
}
@Test
- public void testWithoutSchema() {
+ void testWithoutSchema() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE1))
@@ -68,7 +68,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
}
@Test
- public void testWithSchema() {
+ void testWithSchema() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(
@@ -81,7 +81,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
}
@Test
- public void testFullPath() {
+ void testFullPath() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(
@@ -96,7 +96,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
}
@Test
- public void testInsert() throws Exception {
+ void testInsert() throws Exception {
tEnv.executeSql(String.format("insert into %s select * from `%s`", TABLE4, TABLE1)).await();
List<Row> results =
@@ -108,7 +108,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
}
@Test
- public void testGroupByInsert() throws Exception {
+ void testGroupByInsert() throws Exception {
tEnv.executeSql(
String.format(
"insert into `%s` "
@@ -131,7 +131,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
}
@Test
- public void testPrimitiveTypes() {
+ void testPrimitiveTypes() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE_PRIMITIVE_TYPE))
@@ -144,7 +144,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
}
@Test
- public void testArrayTypes() {
+ void testArrayTypes() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE_ARRAY_TYPE))
@@ -176,7 +176,7 @@ public class PostgresCatalogITCase extends PostgresCatalogTestBase {
}
@Test
- public void testSerialTypes() {
+ void testSerialTypes() {
List<Row> results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE_SERIAL_TYPE))
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java
index a5a9419..4286a49 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTest.java
@@ -24,34 +24,35 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link PostgresCatalog}. */
-public class PostgresCatalogTest extends PostgresCatalogTestBase {
+class PostgresCatalogTest extends PostgresCatalogTestBase {
// ------ databases ------
@Test
- public void testGetDb_DatabaseNotExistException() throws Exception {
- exception.expect(DatabaseNotExistException.class);
- exception.expectMessage("Database nonexistent does not exist in Catalog");
- catalog.getDatabase("nonexistent");
+ void testGetDb_DatabaseNotExistException() {
+ assertThatThrownBy(() -> catalog.getDatabase("nonexistent"))
+ .isInstanceOf(DatabaseNotExistException.class)
+ .hasMessageContaining("Database nonexistent does not exist in Catalog");
}
@Test
- public void testListDatabases() {
+ void testListDatabases() {
List<String> actual = catalog.listDatabases();
assertThat(actual).isEqualTo(Arrays.asList("postgres", "test"));
}
@Test
- public void testDbExists() throws Exception {
+ void testDbExists() {
assertThat(catalog.databaseExists("nonexistent")).isFalse();
assertThat(catalog.databaseExists(PostgresCatalog.DEFAULT_DATABASE)).isTrue();
@@ -60,7 +61,7 @@ public class PostgresCatalogTest extends PostgresCatalogTestBase {
// ------ tables ------
@Test
- public void testListTables() throws DatabaseNotExistException {
+ void testListTables() throws DatabaseNotExistException {
List<String> actual = catalog.listTables(PostgresCatalog.DEFAULT_DATABASE);
assertThat(actual)
@@ -80,13 +81,13 @@ public class PostgresCatalogTest extends PostgresCatalogTestBase {
}
@Test
- public void testListTables_DatabaseNotExistException() throws DatabaseNotExistException {
- exception.expect(DatabaseNotExistException.class);
- catalog.listTables("postgres/nonexistschema");
+ void testListTables_DatabaseNotExistException() {
+ assertThatThrownBy(() -> catalog.listTables("postgres/nonexistschema"))
+ .isInstanceOf(DatabaseNotExistException.class);
}
@Test
- public void testTableExists() {
+ void testTableExists() {
assertThat(catalog.tableExists(new ObjectPath(TEST_DB, "nonexist"))).isFalse();
assertThat(catalog.tableExists(new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE1)))
@@ -96,32 +97,43 @@ public class PostgresCatalogTest extends PostgresCatalogTestBase {
}
@Test
- public void testGetTables_TableNotExistException() throws TableNotExistException {
- exception.expect(TableNotExistException.class);
- catalog.getTable(
- new ObjectPath(
- TEST_DB, PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable")));
+ void testGetTables_TableNotExistException() {
+ assertThatThrownBy(
+ () ->
+ catalog.getTable(
+ new ObjectPath(
+ TEST_DB,
+ PostgresTablePath.toFlinkTableName(
+ TEST_SCHEMA, "anytable"))))
+ .isInstanceOf(TableNotExistException.class);
}
@Test
- public void testGetTables_TableNotExistException_NoSchema() throws TableNotExistException {
- exception.expect(TableNotExistException.class);
- catalog.getTable(
- new ObjectPath(
- TEST_DB, PostgresTablePath.toFlinkTableName("nonexistschema", "anytable")));
+ void testGetTables_TableNotExistException_NoSchema() {
+ assertThatThrownBy(
+ () ->
+ catalog.getTable(
+ new ObjectPath(
+ TEST_DB,
+ PostgresTablePath.toFlinkTableName(
+ "nonexistschema", "anytable"))))
+ .isInstanceOf(TableNotExistException.class);
}
@Test
- public void testGetTables_TableNotExistException_NoDb() throws TableNotExistException {
- exception.expect(TableNotExistException.class);
- catalog.getTable(
- new ObjectPath(
- "nonexistdb", PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable")));
+ void testGetTables_TableNotExistException_NoDb() {
+ assertThatThrownBy(
+ () ->
+ catalog.getTable(
+ new ObjectPath(
+ "nonexistdb",
+ PostgresTablePath.toFlinkTableName(
+ TEST_SCHEMA, "anytable"))))
+ .isInstanceOf(TableNotExistException.class);
}
@Test
- public void testGetTable()
- throws org.apache.flink.table.catalog.exceptions.TableNotExistException {
+ void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExistException {
// test postgres.public.user1
Schema schema = getSimpleTable().schema;
@@ -149,7 +161,7 @@ public class PostgresCatalogTest extends PostgresCatalogTestBase {
}
@Test
- public void testPrimitiveDataTypes() throws TableNotExistException {
+ void testPrimitiveDataTypes() throws TableNotExistException {
CatalogBaseTable table =
catalog.getTable(
new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_PRIMITIVE_TYPE));
@@ -158,7 +170,7 @@ public class PostgresCatalogTest extends PostgresCatalogTestBase {
}
@Test
- public void testArrayDataTypes() throws TableNotExistException {
+ void testArrayDataTypes() throws TableNotExistException {
CatalogBaseTable table =
catalog.getTable(
new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE_ARRAY_TYPE));
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
index 38a423d..2666bbc 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
@@ -23,14 +23,13 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.types.logical.DecimalType;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.sql.Connection;
@@ -39,12 +38,11 @@ import java.sql.SQLException;
import java.sql.Statement;
/** Test base for {@link PostgresCatalog}. */
-public class PostgresCatalogTestBase {
+@Testcontainers
+class PostgresCatalogTestBase {
public static final Logger LOG = LoggerFactory.getLogger(PostgresCatalogTestBase.class);
- @Rule public ExpectedException exception = ExpectedException.none();
-
protected static final DockerImageName POSTGRES_IMAGE =
DockerImageName.parse(DockerImageVersions.POSTGRES);
@@ -66,15 +64,15 @@ public class PostgresCatalogTestBase {
protected static String baseUrl;
protected static PostgresCatalog catalog;
- @ClassRule
- public static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
+ @Container
+ static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
new PostgreSQLContainer<>(POSTGRES_IMAGE)
.withUsername(TEST_USERNAME)
.withPassword(TEST_PWD)
.withLogConsumer(new Slf4jLogConsumer(LOG));
- @BeforeClass
- public static void init() throws SQLException {
+ @BeforeAll
+ static void init() throws SQLException {
// jdbc:postgresql://localhost:50807/postgres?user=postgres
String jdbcUrl = POSTGRES_CONTAINER.getJdbcUrl();
// jdbc:postgresql://localhost:50807/
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePathTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePathTest.java
index db63c34..61314af 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePathTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresTablePathTest.java
@@ -18,14 +18,14 @@
package org.apache.flink.connector.jdbc.catalog;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link PostgresTablePath}. */
-public class PostgresTablePathTest {
+class PostgresTablePathTest {
@Test
- public void testFromFlinkTableName() {
+ void testFromFlinkTableName() {
assertThat(PostgresTablePath.fromFlinkTableName("public.topic"))
.isEqualTo(new PostgresTablePath("public", "topic"));
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
index 55dda31..e3aaefc 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
@@ -25,13 +25,14 @@ import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.factories.FactoryUtil;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.sql.SQLException;
@@ -41,7 +42,8 @@ import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link JdbcCatalogFactory}. */
-public class JdbcCatalogFactoryTest {
+@Testcontainers
+class JdbcCatalogFactoryTest {
public static final Logger LOG = LoggerFactory.getLogger(JdbcCatalogFactoryTest.class);
@@ -55,15 +57,15 @@ public class JdbcCatalogFactoryTest {
protected static final DockerImageName POSTGRES_IMAGE =
DockerImageName.parse(DockerImageVersions.POSTGRES);
- @ClassRule
- public static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
+ @Container
+ static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
new PostgreSQLContainer<>(POSTGRES_IMAGE)
.withUsername(TEST_USERNAME)
.withPassword(TEST_PWD)
.withLogConsumer(new Slf4jLogConsumer(LOG));
- @BeforeClass
- public static void setup() throws SQLException {
+ @BeforeAll
+ static void setup() throws SQLException {
// jdbc:postgresql://localhost:50807/postgres?user=postgres
String jdbcUrl = POSTGRES_CONTAINER.getJdbcUrl();
// jdbc:postgresql://localhost:50807/
@@ -80,7 +82,7 @@ public class JdbcCatalogFactoryTest {
}
@Test
- public void test() {
+ void test() {
final Map<String, String> options = new HashMap<>();
options.put(CommonCatalogOptions.CATALOG_TYPE.key(), JdbcCatalogFactoryOptions.IDENTIFIER);
options.put(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java
index 670b39c..9f9fee1 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverterTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.sql.ResultSet;
@@ -32,10 +32,10 @@ import java.time.LocalDateTime;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link AbstractJdbcRowConverter}. */
-public class AbstractJdbcRowConverterTest {
+class AbstractJdbcRowConverterTest {
@Test
- public void testExternalLocalDateTimeToTimestamp() throws Exception {
+ void testExternalLocalDateTimeToTimestamp() throws Exception {
RowType rowType = RowType.of(new IntType(), new TimestampType(3));
JdbcRowConverter rowConverter =
new AbstractJdbcRowConverter(rowType) {
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java
index 006cc76..614020f 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/mysql/MySqlDialectTest.java
@@ -23,7 +23,7 @@ import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link MySqlDialect}. */
-public class MySqlDialectTest {
+class MySqlDialectTest {
@Test
void testAppendDefaultUrlProperties() {
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java
index b4827b6..79dee14 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.List;
@@ -33,7 +33,7 @@ import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link OraclePreparedStatementTest}. */
-public class OraclePreparedStatementTest {
+class OraclePreparedStatementTest {
private final JdbcDialect dialect =
JdbcDialectLoader.load(
@@ -44,7 +44,7 @@ public class OraclePreparedStatementTest {
private final String tableName = "tbl";
@Test
- public void testInsertStatement() {
+ void testInsertStatement() {
String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames);
assertThat(insertStmt)
.isEqualTo(
@@ -64,7 +64,7 @@ public class OraclePreparedStatementTest {
}
@Test
- public void testDeleteStatement() {
+ void testDeleteStatement() {
String deleteStmt = dialect.getDeleteStatement(tableName, keyFields);
assertThat(deleteStmt)
.isEqualTo("DELETE FROM tbl WHERE id = :id AND __field_3__ = :__field_3__");
@@ -75,7 +75,7 @@ public class OraclePreparedStatementTest {
}
@Test
- public void testRowExistsStatement() {
+ void testRowExistsStatement() {
String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields);
assertThat(rowExistStmt)
.isEqualTo("SELECT 1 FROM tbl WHERE id = :id AND __field_3__ = :__field_3__");
@@ -86,7 +86,7 @@ public class OraclePreparedStatementTest {
}
@Test
- public void testUpdateStatement() {
+ void testUpdateStatement() {
String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, keyFields);
assertThat(updateStmt)
.isEqualTo(
@@ -107,7 +107,7 @@ public class OraclePreparedStatementTest {
}
@Test
- public void testUpsertStatement() {
+ void testUpsertStatement() {
String upsertStmt = dialect.getUpsertStatement(tableName, fieldNames, keyFields).get();
assertThat(upsertStmt)
.isEqualTo(
@@ -135,7 +135,7 @@ public class OraclePreparedStatementTest {
}
@Test
- public void testSelectStatement() {
+ void testSelectStatement() {
String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields);
assertThat(selectStmt)
.isEqualTo(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSinkITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSinkITCase.java
index be8a6f8..8c9da92 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSinkITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSinkITCase.java
@@ -45,9 +45,9 @@ import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
import java.sql.Connection;
@@ -66,7 +66,7 @@ import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
/** The Table Sink ITCase for {@link OracleDialect}. */
-public class OracleTableSinkITCase extends AbstractTestBase {
+class OracleTableSinkITCase extends AbstractTestBase {
private static final OracleContainer container = new OracleContainer();
private static String containerUrl;
@@ -78,8 +78,8 @@ public class OracleTableSinkITCase extends AbstractTestBase {
public static final String OUTPUT_TABLE5 = "checkpointTable";
public static final String USER_TABLE = "USER_TABLE";
- @BeforeClass
- public static void beforeAll() throws ClassNotFoundException, SQLException {
+ @BeforeAll
+ static void beforeAll() throws ClassNotFoundException, SQLException {
container.start();
containerUrl = container.getJdbcUrl();
Class.forName(container.getDriverClassName());
@@ -128,8 +128,8 @@ public class OracleTableSinkITCase extends AbstractTestBase {
}
}
- @AfterClass
- public static void afterAll() throws Exception {
+ @AfterAll
+ static void afterAll() throws Exception {
TestValuesTableFactory.clearAllData();
Class.forName(container.getDriverClassName());
try (Connection conn = DriverManager.getConnection(containerUrl);
@@ -181,7 +181,7 @@ public class OracleTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testReal() throws Exception {
+ void testReal() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
StreamTableEnvironment tEnv =
@@ -205,7 +205,7 @@ public class OracleTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testUpsert() throws Exception {
+ void testUpsert() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -271,7 +271,7 @@ public class OracleTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testAppend() throws Exception {
+ void testAppend() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.getConfig().setParallelism(1);
@@ -312,7 +312,7 @@ public class OracleTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testBatchSink() throws Exception {
+ void testBatchSink() throws Exception {
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
tEnv.executeSql(
@@ -355,7 +355,7 @@ public class OracleTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testReadingFromChangelogSource() throws Exception {
+ void testReadingFromChangelogSource() throws Exception {
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
String dataId = TestValuesTableFactory.registerData(TestData.userChangelog());
tEnv.executeSql(
@@ -420,7 +420,7 @@ public class OracleTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testFlushBufferWhenCheckpoint() throws Exception {
+ void testFlushBufferWhenCheckpoint() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("connector", "jdbc");
options.put("url", containerUrl);
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSourceITCase.java
index 4c0a15d..eb340c0 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSourceITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OracleTableSourceITCase.java
@@ -25,10 +25,10 @@ import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -44,7 +44,7 @@ import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
/** The Table Source ITCase for {@link OracleDialect}. */
-public class OracleTableSourceITCase extends AbstractTestBase {
+class OracleTableSourceITCase extends AbstractTestBase {
private static final OracleContainer container = new OracleContainer();
private static String containerUrl;
@@ -53,8 +53,8 @@ public class OracleTableSourceITCase extends AbstractTestBase {
private static StreamExecutionEnvironment env;
private static TableEnvironment tEnv;
- @BeforeClass
- public static void beforeAll() throws ClassNotFoundException, SQLException {
+ @BeforeAll
+ static void beforeAll() throws ClassNotFoundException, SQLException {
container.start();
containerUrl = container.getJdbcUrl();
Class.forName(container.getDriverClassName());
@@ -96,8 +96,8 @@ public class OracleTableSourceITCase extends AbstractTestBase {
}
}
- @AfterClass
- public static void afterAll() throws Exception {
+ @AfterAll
+ static void afterAll() throws Exception {
Class.forName(container.getDriverClassName());
try (Connection conn = DriverManager.getConnection(containerUrl);
Statement statement = conn.createStatement()) {
@@ -106,14 +106,14 @@ public class OracleTableSourceITCase extends AbstractTestBase {
container.stop();
}
- @Before
- public void before() throws Exception {
+ @BeforeEach
+ void before() throws Exception {
env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
}
@Test
- public void testJdbcSource() throws Exception {
+ void testJdbcSource() throws Exception {
tEnv.executeSql(
"CREATE TABLE "
+ INPUT_TABLE
@@ -158,7 +158,7 @@ public class OracleTableSourceITCase extends AbstractTestBase {
}
@Test
- public void testProject() throws Exception {
+ void testProject() throws Exception {
tEnv.executeSql(
"CREATE TABLE "
+ INPUT_TABLE
@@ -201,7 +201,7 @@ public class OracleTableSourceITCase extends AbstractTestBase {
}
@Test
- public void testLimit() throws Exception {
+ void testLimit() throws Exception {
tEnv.executeSql(
"CREATE TABLE "
+ INPUT_TABLE
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java
index 0227a58..b40466b 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java
@@ -45,10 +45,12 @@ import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MSSQLServerContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import java.math.BigDecimal;
import java.sql.Connection;
@@ -67,11 +69,14 @@ import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
/** The Table Sink ITCase for {@link SqlServerDialect}. */
-public class SqlServerTableSinkITCase extends AbstractTestBase {
+@Testcontainers
+class SqlServerTableSinkITCase extends AbstractTestBase {
- private static final MSSQLServerContainer container =
+ @Container
+ private static final MSSQLServerContainer<?> container =
new MSSQLServerContainer("mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04")
.acceptLicense();
+
private static String containerUrl;
public static final String OUTPUT_TABLE1 = "dynamicSinkForUpsert";
@@ -81,9 +86,8 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
public static final String OUTPUT_TABLE5 = "checkpointTable";
public static final String USER_TABLE = "USER_TABLE";
- @BeforeClass
- public static void beforeAll() throws ClassNotFoundException, SQLException {
- container.start();
+ @BeforeAll
+ static void beforeAll() throws ClassNotFoundException, SQLException {
containerUrl =
String.format(
"%s;username=%s;password=%s",
@@ -136,8 +140,8 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
}
}
- @AfterClass
- public static void afterAll() throws Exception {
+ @AfterAll
+ static void afterAll() throws Exception {
TestValuesTableFactory.clearAllData();
Class.forName(container.getDriverClassName());
try (Connection conn =
@@ -191,7 +195,7 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testReal() throws Exception {
+ void testReal() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
StreamTableEnvironment tEnv =
@@ -221,7 +225,7 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testUpsert() throws Exception {
+ void testUpsert() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -293,7 +297,7 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testAppend() throws Exception {
+ void testAppend() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.getConfig().setParallelism(1);
@@ -340,7 +344,7 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testBatchSink() throws Exception {
+ void testBatchSink() throws Exception {
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
tEnv.executeSql(
@@ -389,7 +393,7 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testReadingFromChangelogSource() throws Exception {
+ void testReadingFromChangelogSource() throws Exception {
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
String dataId = TestValuesTableFactory.registerData(TestData.userChangelog());
tEnv.executeSql(
@@ -460,7 +464,7 @@ public class SqlServerTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testFlushBufferWhenCheckpoint() throws Exception {
+ void testFlushBufferWhenCheckpoint() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("connector", "jdbc");
options.put("url", containerUrl);
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSourceITCase.java
index 2c9c73b..9abfe54 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSourceITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSourceITCase.java
@@ -25,10 +25,10 @@ import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MSSQLServerContainer;
import java.sql.Connection;
@@ -43,7 +43,7 @@ import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
/** The Table Source ITCase for {@link SqlServerDialect}. */
-public class SqlServerTableSourceITCase extends AbstractTestBase {
+class SqlServerTableSourceITCase extends AbstractTestBase {
private static final MSSQLServerContainer container =
new MSSQLServerContainer("mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04")
@@ -54,8 +54,8 @@ public class SqlServerTableSourceITCase extends AbstractTestBase {
private static StreamExecutionEnvironment env;
private static TableEnvironment tEnv;
- @BeforeClass
- public static void beforeAll() throws ClassNotFoundException, SQLException {
+ @BeforeAll
+ static void beforeAll() throws ClassNotFoundException, SQLException {
container.start();
containerUrl = container.getJdbcUrl();
Class.forName(container.getDriverClassName());
@@ -106,8 +106,8 @@ public class SqlServerTableSourceITCase extends AbstractTestBase {
}
}
- @AfterClass
- public static void afterAll() throws Exception {
+ @AfterAll
+ static void afterAll() throws Exception {
Class.forName(container.getDriverClassName());
try (Connection conn =
DriverManager.getConnection(
@@ -118,14 +118,14 @@ public class SqlServerTableSourceITCase extends AbstractTestBase {
container.stop();
}
- @Before
- public void before() throws Exception {
+ @BeforeEach
+ void before() throws Exception {
env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
}
@Test
- public void testJdbcSource() throws Exception {
+ void testJdbcSource() throws Exception {
createFlinkTable();
Iterator<Row> collected = tEnv.executeSql("SELECT * FROM " + INPUT_TABLE).collect();
List<String> result =
@@ -149,7 +149,7 @@ public class SqlServerTableSourceITCase extends AbstractTestBase {
}
@Test
- public void testProject() throws Exception {
+ void testProject() throws Exception {
createFlinkTable();
Iterator<Row> collected =
tEnv.executeSql("SELECT id,datetime_col,decimal_col FROM " + INPUT_TABLE).collect();
@@ -168,7 +168,7 @@ public class SqlServerTableSourceITCase extends AbstractTestBase {
}
@Test
- public void testFilter() throws Exception {
+ void testFilter() throws Exception {
createFlinkTable();
Iterator<Row> collected =
tEnv.executeSql(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
index b1904f3..e92a9ff 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
@@ -34,8 +34,8 @@ import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
import org.apache.flink.types.Row;
-import org.junit.After;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.sql.Connection;
@@ -60,20 +60,20 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doReturn;
/** Tests using both {@link JdbcInputFormat} and {@link JdbcOutputFormat}. */
-public class JdbcFullTest extends JdbcDataTestBase {
+class JdbcFullTest extends JdbcDataTestBase {
@Test
- public void testWithoutParallelism() throws Exception {
+ void testWithoutParallelism() throws Exception {
runTest(false);
}
@Test
- public void testWithParallelism() throws Exception {
+ void testWithParallelism() throws Exception {
runTest(true);
}
@Test
- public void testEnrichedClassCastException() {
+ void testEnrichedClassCastException() {
String expectedMsg = "field index: 3, field value: 11.11.";
try {
JdbcOutputFormat jdbcOutputFormat =
@@ -173,8 +173,8 @@ public class JdbcFullTest extends JdbcDataTestBase {
}
}
- @After
- public void clearOutputTable() throws Exception {
+ @AfterEach
+ void clearOutputTable() throws Exception {
Class.forName(getDbMetadata().getDriverClass());
try (Connection conn = DriverManager.getConnection(getDbMetadata().getUrl());
Statement stat = conn.createStatement()) {
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
index 9e684e3..938dcd1 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcTableOutputFormatTest.java
@@ -29,9 +29,9 @@ import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import org.apache.flink.types.Row;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.sql.Connection;
@@ -57,14 +57,14 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase {
private String[] fieldNames;
private String[] keyFields;
- @Before
- public void setup() {
+ @BeforeEach
+ void setup() {
fieldNames = new String[] {"id", "title", "author", "price", "qty"};
keyFields = new String[] {"id"};
}
@Test
- public void testUpsertFormatCloseBeforeOpen() throws Exception {
+ void testUpsertFormatCloseBeforeOpen() throws Exception {
JdbcConnectorOptions options =
JdbcConnectorOptions.builder()
.setDBUrl(getDbMetadata().getUrl())
@@ -91,7 +91,7 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase {
* JdbcOutputFormat#attemptFlush()} fails.
*/
@Test
- public void testDeleteExecutorUpdatedOnReconnect() throws Exception {
+ void testDeleteExecutorUpdatedOnReconnect() throws Exception {
// first fail flush from the main executor
boolean[] exceptionThrown = {false};
// then record whether the delete executor was updated
@@ -171,7 +171,7 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testJdbcOutputFormat() throws Exception {
+ void testJdbcOutputFormat() throws Exception {
JdbcConnectorOptions options =
JdbcConnectorOptions.builder()
.setDBUrl(getDbMetadata().getUrl())
@@ -246,8 +246,8 @@ public class JdbcTableOutputFormatTest extends JdbcDataTestBase {
}
}
- @After
- public void clearOutputTable() throws Exception {
+ @AfterEach
+ void clearOutputTable() throws Exception {
if (format != null) {
format.close();
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase.java
index ffd2e5e..5fb3676 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase.java
@@ -22,11 +22,13 @@ import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.fakedb.FakeDBUtils;
import org.apache.flink.core.testutils.CheckedThread;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
@@ -35,7 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* This test deals with sql driver class loading issues; run as an ITCase so it won't be interfered
* with by other tests.
*/
-public class SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase {
+class SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase {
private static boolean isClassLoaded(ClassLoader classLoader, String className)
throws Exception {
do {
@@ -50,8 +52,9 @@ public class SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase {
return false;
}
- @Test(timeout = 5000)
- public void testDriverClassConcurrentLoading() throws Exception {
+ @Test
+ @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
+ void testDriverClassConcurrentLoading() throws Exception {
ClassLoader classLoader = getClass().getClassLoader();
assertThat(isClassLoaded(classLoader, FakeDBUtils.DRIVER1_CLASS_NAME)).isFalse();
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderTest.java
index 1e19299..cf99e72 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProviderTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.connector.jdbc.fakedb.driver.FakeConnection1;
import org.apache.flink.connector.jdbc.fakedb.driver.FakeConnection2;
import org.apache.flink.connector.jdbc.fakedb.driver.FakeConnection3;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.sql.Connection;
import java.sql.Driver;
@@ -38,7 +38,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link SimpleJdbcConnectionProvider}. */
-public class SimpleJdbcConnectionProviderTest {
+class SimpleJdbcConnectionProviderTest {
private static JdbcConnectionProvider newFakeConnectionProviderWithDriverName(
String driverName) {
@@ -59,7 +59,7 @@ public class SimpleJdbcConnectionProviderTest {
}
@Test
- public void testEstablishConnection() throws Exception {
+ void testEstablishConnection() throws Exception {
JdbcConnectionProvider provider = newFakeConnectionProvider();
assertThat(provider.getConnection()).isNull();
assertThat(provider.isConnectionValid()).isFalse();
@@ -75,7 +75,7 @@ public class SimpleJdbcConnectionProviderTest {
}
@Test
- public void testEstablishConnectionWithoutDriverName() throws Exception {
+ void testEstablishConnectionWithoutDriverName() throws Exception {
JdbcConnectionProvider provider = newProvider(FakeDBUtils.TEST_DB_URL, null);
assertThat(provider.getConnection()).isNull();
assertThat(provider.isConnectionValid()).isFalse();
@@ -93,7 +93,7 @@ public class SimpleJdbcConnectionProviderTest {
}
@Test
- public void testEstablishDriverConnection() throws Exception {
+ void testEstablishDriverConnection() throws Exception {
JdbcConnectionProvider provider1 =
newFakeConnectionProviderWithDriverName(FakeDBUtils.DRIVER1_CLASS_NAME);
Connection connection1 = provider1.getOrEstablishConnection();
@@ -106,7 +106,7 @@ public class SimpleJdbcConnectionProviderTest {
}
@Test
- public void testEstablishUnregisteredDriverConnection() throws Exception {
+ void testEstablishUnregisteredDriverConnection() throws Exception {
String unregisteredDriverName = FakeDBUtils.DRIVER3_CLASS_NAME;
Set<String> registeredDriverNames =
Collections.list(DriverManager.getDrivers()).stream()
@@ -122,7 +122,7 @@ public class SimpleJdbcConnectionProviderTest {
}
@Test
- public void testInvalidDriverUrl() {
+ void testInvalidDriverUrl() {
JdbcConnectionProvider provider =
newProvider(FakeDBUtils.TEST_DB_INVALID_URL, FakeDBUtils.DRIVER1_CLASS_NAME);
@@ -133,7 +133,7 @@ public class SimpleJdbcConnectionProviderTest {
}
@Test
- public void testCloseNullConnection() throws Exception {
+ void testCloseNullConnection() throws Exception {
JdbcConnectionProvider provider = newFakeConnectionProvider();
provider.closeConnection();
assertThat(provider.getConnection()).isNull();
@@ -141,7 +141,7 @@ public class SimpleJdbcConnectionProviderTest {
}
@Test
- public void testCloseConnection() throws Exception {
+ void testCloseConnection() throws Exception {
JdbcConnectionProvider provider = newFakeConnectionProvider();
Connection connection1 = provider.getOrEstablishConnection();
@@ -160,7 +160,7 @@ public class SimpleJdbcConnectionProviderTest {
}
@Test
- public void testReestablishCachedConnection() throws Exception {
+ void testReestablishCachedConnection() throws Exception {
JdbcConnectionProvider provider = newFakeConnectionProvider();
Connection connection1 = provider.reestablishConnection();
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java
index cb1ebf3..25e350e 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/split/NumericBetweenParametersProviderTest.java
@@ -18,17 +18,17 @@
package org.apache.flink.connector.jdbc.split;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.Serializable;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link JdbcNumericBetweenParametersProvider}. */
-public class NumericBetweenParametersProviderTest {
+class NumericBetweenParametersProviderTest {
@Test
- public void testBatchSizeDivisible() {
+ void testBatchSizeDivisible() {
JdbcNumericBetweenParametersProvider provider =
new JdbcNumericBetweenParametersProvider(-5, 9).ofBatchSize(3);
Serializable[][] actual = provider.getParameterValues();
@@ -44,7 +44,7 @@ public class NumericBetweenParametersProviderTest {
}
@Test
- public void testBatchSizeNotDivisible() {
+ void testBatchSizeNotDivisible() {
JdbcNumericBetweenParametersProvider provider =
new JdbcNumericBetweenParametersProvider(-5, 11).ofBatchSize(4);
Serializable[][] actual = provider.getParameterValues();
@@ -60,7 +60,7 @@ public class NumericBetweenParametersProviderTest {
}
@Test
- public void testBatchSizeTooLarge() {
+ void testBatchSizeTooLarge() {
JdbcNumericBetweenParametersProvider provider =
new JdbcNumericBetweenParametersProvider(0, 2).ofBatchSize(5);
Serializable[][] actual = provider.getParameterValues();
@@ -70,7 +70,7 @@ public class NumericBetweenParametersProviderTest {
}
@Test
- public void testBatchNumDivisible() {
+ void testBatchNumDivisible() {
JdbcNumericBetweenParametersProvider provider =
new JdbcNumericBetweenParametersProvider(-5, 9).ofBatchNum(5);
Serializable[][] actual = provider.getParameterValues();
@@ -86,7 +86,7 @@ public class NumericBetweenParametersProviderTest {
}
@Test
- public void testBatchNumNotDivisible() {
+ void testBatchNumNotDivisible() {
JdbcNumericBetweenParametersProvider provider =
new JdbcNumericBetweenParametersProvider(-5, 11).ofBatchNum(5);
Serializable[][] actual = provider.getParameterValues();
@@ -102,7 +102,7 @@ public class NumericBetweenParametersProviderTest {
}
@Test
- public void testBatchNumTooLarge() {
+ void testBatchNumTooLarge() {
JdbcNumericBetweenParametersProvider provider =
new JdbcNumericBetweenParametersProvider(0, 2).ofBatchNum(5);
Serializable[][] actual = provider.getParameterValues();
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
index 013c711..b1d5684 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.connector.jdbc.statement;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.List;
@@ -32,7 +32,7 @@ import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link FieldNamedPreparedStatementImpl}. */
-public class FieldNamedPreparedStatementImplTest {
+class FieldNamedPreparedStatementImplTest {
private final JdbcDialect dialect =
JdbcDialectLoader.load("jdbc:mysql://localhost:3306/test", getClass().getClassLoader());
@@ -42,7 +42,7 @@ public class FieldNamedPreparedStatementImplTest {
private final String tableName = "tbl";
@Test
- public void testInsertStatement() {
+ void testInsertStatement() {
String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames);
assertThat(insertStmt)
.isEqualTo(
@@ -62,7 +62,7 @@ public class FieldNamedPreparedStatementImplTest {
}
@Test
- public void testDeleteStatement() {
+ void testDeleteStatement() {
String deleteStmt = dialect.getDeleteStatement(tableName, keyFields);
assertThat(deleteStmt)
.isEqualTo("DELETE FROM `tbl` WHERE `id` = :id AND `__field_3__` = :__field_3__");
@@ -73,7 +73,7 @@ public class FieldNamedPreparedStatementImplTest {
}
@Test
- public void testRowExistsStatement() {
+ void testRowExistsStatement() {
String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields);
assertThat(rowExistStmt)
.isEqualTo("SELECT 1 FROM `tbl` WHERE `id` = :id AND `__field_3__` = :__field_3__");
@@ -84,7 +84,7 @@ public class FieldNamedPreparedStatementImplTest {
}
@Test
- public void testUpdateStatement() {
+ void testUpdateStatement() {
String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, keyFields);
assertThat(updateStmt)
.isEqualTo(
@@ -105,7 +105,7 @@ public class FieldNamedPreparedStatementImplTest {
}
@Test
- public void testUpsertStatement() {
+ void testUpsertStatement() {
String upsertStmt = dialect.getUpsertStatement(tableName, fieldNames, keyFields).get();
assertThat(upsertStmt)
.isEqualTo(
@@ -130,7 +130,7 @@ public class FieldNamedPreparedStatementImplTest {
}
@Test
- public void testSelectStatement() {
+ void testSelectStatement() {
String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields);
assertThat(selectStmt)
.isEqualTo(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
index 66b0cc4..34ad9e1 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
@@ -27,9 +27,9 @@ import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
@@ -42,50 +42,57 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB
import static org.apache.flink.connector.jdbc.JdbcTestFixture.OUTPUT_TABLE;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.doReturn;
/** Test for the Append only mode. */
-public class JdbcAppendOnlyWriterTest extends JdbcTestBase {
+class JdbcAppendOnlyWriterTest extends JdbcTestBase {
private JdbcOutputFormat format;
private String[] fieldNames;
- @Before
- public void setup() {
+ @BeforeEach
+ void setup() {
fieldNames = new String[] {"id", "title", "author", "price", "qty"};
}
- @Test(expected = IOException.class)
- public void testMaxRetry() throws Exception {
- format =
- JdbcOutputFormat.builder()
- .setOptions(
- JdbcConnectorOptions.builder()
- .setDBUrl(getDbMetadata().getUrl())
- .setDialect(
- JdbcDialectLoader.load(
- getDbMetadata().getUrl(),
- getClass().getClassLoader()))
- .setTableName(OUTPUT_TABLE)
- .build())
- .setFieldNames(fieldNames)
- .setKeyFields(null)
- .build();
- RuntimeContext context = Mockito.mock(RuntimeContext.class);
- ExecutionConfig config = Mockito.mock(ExecutionConfig.class);
- doReturn(config).when(context).getExecutionConfig();
- doReturn(true).when(config).isObjectReuseEnabled();
- format.setRuntimeContext(context);
- format.open(0, 1);
+ @Test
+ void testMaxRetry() throws Exception {
+ assertThatThrownBy(
+ () -> {
+ format =
+ JdbcOutputFormat.builder()
+ .setOptions(
+ JdbcConnectorOptions.builder()
+ .setDBUrl(getDbMetadata().getUrl())
+ .setDialect(
+ JdbcDialectLoader.load(
+ getDbMetadata()
+ .getUrl(),
+ getClass()
+ .getClassLoader()))
+ .setTableName(OUTPUT_TABLE)
+ .build())
+ .setFieldNames(fieldNames)
+ .setKeyFields(null)
+ .build();
+ RuntimeContext context = Mockito.mock(RuntimeContext.class);
+ ExecutionConfig config = Mockito.mock(ExecutionConfig.class);
+ doReturn(config).when(context).getExecutionConfig();
+ doReturn(true).when(config).isObjectReuseEnabled();
+ format.setRuntimeContext(context);
+ format.open(0, 1);
- // alter table schema to trigger retry logic after failure.
- alterTable();
- for (TestEntry entry : TEST_DATA) {
- format.writeRecord(Tuple2.of(true, toRow(entry)));
- }
+ // alter table schema to trigger retry logic after failure.
+ alterTable();
+ for (TestEntry entry : TEST_DATA) {
+ format.writeRecord(Tuple2.of(true, toRow(entry)));
+ }
- // after retry default times, throws a BatchUpdateException.
- format.flush();
+ // after retry default times, throws a BatchUpdateException.
+ format.flush();
+ })
+ .isInstanceOf(IOException.class);
}
private void alterTable() throws Exception {
@@ -96,8 +103,8 @@ public class JdbcAppendOnlyWriterTest extends JdbcTestBase {
}
}
- @After
- public void clear() throws Exception {
+ @AfterEach
+ void clear() throws Exception {
if (format != null) {
try {
format.close();
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java
index 13617ef..44d4d65 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactoryTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Arrays;
@@ -49,7 +49,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
* Test for {@link JdbcDynamicTableSource} and {@link JdbcDynamicTableSink} created by {@link
* JdbcDynamicTableFactory}.
*/
-public class JdbcDynamicTableFactoryTest {
+class JdbcDynamicTableFactoryTest {
private static final ResolvedSchema SCHEMA =
new ResolvedSchema(
@@ -63,7 +63,7 @@ public class JdbcDynamicTableFactoryTest {
UniqueConstraint.primaryKey("name", Arrays.asList("bbb", "aaa")));
@Test
- public void testJdbcCommonProperties() {
+ void testJdbcCommonProperties() {
Map<String, String> properties = getAllOptions();
properties.put("driver", "org.apache.derby.jdbc.EmbeddedDriver");
properties.put("username", "user");
@@ -113,7 +113,7 @@ public class JdbcDynamicTableFactoryTest {
}
@Test
- public void testJdbcReadProperties() {
+ void testJdbcReadProperties() {
Map<String, String> properties = getAllOptions();
properties.put("scan.partition.column", "aaa");
properties.put("scan.partition.lower-bound", "-10");
@@ -150,7 +150,7 @@ public class JdbcDynamicTableFactoryTest {
}
@Test
- public void testJdbcLookupProperties() {
+ void testJdbcLookupProperties() {
Map<String, String> properties = getAllOptions();
properties.put("lookup.cache", "PARTIAL");
properties.put("lookup.partial-cache.expire-after-write", "10s");
@@ -178,7 +178,7 @@ public class JdbcDynamicTableFactoryTest {
}
@Test
- public void testJdbcLookupPropertiesWithLegacyOptions() {
+ void testJdbcLookupPropertiesWithLegacyOptions() {
Map<String, String> properties = getAllOptions();
properties.put("lookup.cache.max-rows", "1000");
properties.put("lookup.cache.ttl", "10s");
@@ -206,7 +206,7 @@ public class JdbcDynamicTableFactoryTest {
}
@Test
- public void testJdbcSinkProperties() {
+ void testJdbcSinkProperties() {
Map<String, String> properties = getAllOptions();
properties.put("sink.buffer-flush.max-rows", "1000");
properties.put("sink.buffer-flush.interval", "2min");
@@ -241,7 +241,7 @@ public class JdbcDynamicTableFactoryTest {
}
@Test
- public void testJDBCSinkWithParallelism() {
+ void testJDBCSinkWithParallelism() {
Map<String, String> properties = getAllOptions();
properties.put("sink.parallelism", "2");
@@ -275,7 +275,7 @@ public class JdbcDynamicTableFactoryTest {
}
@Test
- public void testJdbcValidation() {
+ void testJdbcValidation() {
// only password, no username
Map<String, String> properties = getAllOptions();
properties.put("password", "pass");
@@ -362,7 +362,7 @@ public class JdbcDynamicTableFactoryTest {
}
@Test
- public void testJdbcLookupPropertiesWithExcludeEmptyResult() {
+ void testJdbcLookupPropertiesWithExcludeEmptyResult() {
Map<String, String> properties = getAllOptions();
properties.put("lookup.cache.max-rows", "1000");
properties.put("lookup.cache.ttl", "10s");
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
index 037cda3..82a188d 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSinkITCase.java
@@ -46,9 +46,9 @@ import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
import java.sql.Connection;
@@ -68,7 +68,7 @@ import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
/** The ITCase for {@link JdbcDynamicTableSink}. */
-public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
+class JdbcDynamicTableSinkITCase extends AbstractTestBase {
public static final String DB_URL = "jdbc:derby:memory:upsert";
public static final String OUTPUT_TABLE1 = "dynamicSinkForUpsert";
@@ -78,8 +78,8 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
public static final String OUTPUT_TABLE5 = "checkpointTable";
public static final String USER_TABLE = "USER_TABLE";
- @BeforeClass
- public static void beforeAll() throws ClassNotFoundException, SQLException {
+ @BeforeAll
+ static void beforeAll() throws ClassNotFoundException, SQLException {
System.setProperty(
"derby.stream.error.field", JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL");
@@ -129,8 +129,8 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
}
}
- @AfterClass
- public static void afterAll() throws Exception {
+ @AfterAll
+ static void afterAll() throws Exception {
TestValuesTableFactory.clearAllData();
Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
try (Connection conn = DriverManager.getConnection(DB_URL);
@@ -181,7 +181,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testReal() throws Exception {
+ void testReal() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
StreamTableEnvironment tEnv =
@@ -205,7 +205,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testUpsert() throws Exception {
+ void testUpsert() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
@@ -271,7 +271,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testAppend() throws Exception {
+ void testAppend() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.getConfig().setParallelism(1);
@@ -312,7 +312,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testBatchSink() throws Exception {
+ void testBatchSink() throws Exception {
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
tEnv.executeSql(
@@ -355,7 +355,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testReadingFromChangelogSource() throws Exception {
+ void testReadingFromChangelogSource() throws Exception {
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
String dataId = TestValuesTableFactory.registerData(TestData.userChangelog());
tEnv.executeSql(
@@ -420,7 +420,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
}
@Test
- public void testFlushBufferWhenCheckpoint() throws Exception {
+ void testFlushBufferWhenCheckpoint() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("connector", "jdbc");
options.put("url", DB_URL);
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
index d23c5d1..17e1be0 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
@@ -66,7 +66,7 @@ import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
/** ITCase for {@link JdbcDynamicTableSource}. */
-public class JdbcDynamicTableSourceITCase {
+class JdbcDynamicTableSourceITCase {
@RegisterExtension
static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
index e7eb8a4..8bdfc67 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcFilterPushdownPreparedStatementVisitorTest.java
@@ -41,9 +41,9 @@ import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
import org.apache.flink.table.types.logical.RowType;
import org.apache.calcite.rex.RexBuilder;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import java.io.Serializable;
import java.math.BigDecimal;
@@ -57,11 +57,10 @@ import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link JdbcFilterPushdownPreparedStatementVisitor}. */
-public class JdbcFilterPushdownPreparedStatementVisitorTest {
+class JdbcFilterPushdownPreparedStatementVisitorTest {
private final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
@@ -71,8 +70,8 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
public static StreamExecutionEnvironment env;
public static TableEnvironment tEnv;
- @Before
- public void before() throws ClassNotFoundException, SQLException {
+ @BeforeEach
+ void before() throws ClassNotFoundException, SQLException {
env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
@@ -122,8 +121,8 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
+ ")");
}
- @After
- public void clearOutputTable() throws Exception {
+ @AfterEach
+ void clearOutputTable() throws Exception {
Class.forName(DRIVER_CLASS);
try (Connection conn = DriverManager.getConnection(DB_URL);
Statement stat = conn.createStatement()) {
@@ -133,7 +132,7 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
}
@Test
- public void testSimpleExpressionPrimitiveType() {
+ void testSimpleExpressionPrimitiveType() {
ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
Arrays.asList(
new Object[] {"id = 6", "id = ?", 6L},
@@ -156,7 +155,7 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
}
@Test
- public void testComplexExpressionDatetime() {
+ void testComplexExpressionDatetime() {
ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
String andExpr = "id = 6 AND timestamp6_col = TIMESTAMP '2022-01-01 07:00:01.333'";
Serializable[] expectedParams1 = {6L, Timestamp.valueOf("2022-01-01 07:00:01.333000")};
@@ -171,7 +170,7 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
}
@Test
- public void testExpressionWithNull() {
+ void testExpressionWithNull() {
ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
String andExpr = "id = NULL AND real_col <= 0.6";
@@ -186,7 +185,7 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
}
@Test
- public void testExpressionIsNull() {
+ void testExpressionIsNull() {
ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
String andExpr = "id IS NULL AND real_col <= 0.6";
@@ -201,7 +200,7 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
}
@Test
- public void testComplexExpressionPrimitiveType() {
+ void testComplexExpressionPrimitiveType() {
ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
String andExpr = "id = NULL AND real_col <= 0.6";
Serializable[] expectedParams1 = {null, new BigDecimal("0.6")};
@@ -220,15 +219,15 @@ public class JdbcFilterPushdownPreparedStatementVisitorTest {
String expectedOutputExpr,
Serializable[] expectedParams) {
List<ResolvedExpression> resolved = resolveSQLFilterToExpression(inputExpr, schema);
- assertEquals(1, resolved.size());
+ assertThat(resolved.size()).isEqualTo(1);
JdbcDialect dialect = new DerbyDialectFactory().create();
JdbcFilterPushdownPreparedStatementVisitor visitor =
new JdbcFilterPushdownPreparedStatementVisitor(dialect::quoteIdentifier);
ParameterizedPredicate pred = resolved.get(0).accept(visitor).get();
// our visitor always wrap expression
- assertEquals(expectedOutputExpr, pred.getPredicate());
- assertArrayEquals(expectedParams, pred.getParameters());
+ assertThat(pred.getPredicate()).isEqualTo(expectedOutputExpr);
+ assertThat(pred.getParameters()).isEqualTo(expectedParams);
}
private void assertSimpleInputExprEqualsOutExpr(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
index 5937946..700e196 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
@@ -31,13 +31,13 @@ import java.sql.Statement;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB;
/** Base class for JDBC lookup test. */
-public class JdbcLookupTestBase {
+class JdbcLookupTestBase {
public static final String DB_URL = "jdbc:derby:memory:lookup";
public static final String LOOKUP_TABLE = "lookup_table";
@BeforeEach
- public void before() throws ClassNotFoundException, SQLException {
+ void before() throws ClassNotFoundException, SQLException {
System.setProperty(
"derby.stream.error.field", JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL");
@@ -103,7 +103,7 @@ public class JdbcLookupTestBase {
}
@AfterEach
- public void clearOutputTable() throws Exception {
+ void clearOutputTable() throws Exception {
Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
try (Connection conn = DriverManager.getConnection(DB_URL);
Statement stat = conn.createStatement()) {
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
index e870873..487d69c 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcOutputFormatTest.java
@@ -30,8 +30,8 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
-import org.junit.After;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.sql.Connection;
@@ -56,7 +56,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test suite for {@link JdbcOutputFormatBuilder}. */
-public class JdbcOutputFormatTest extends JdbcDataTestBase {
+class JdbcOutputFormatTest extends JdbcDataTestBase {
private static JdbcOutputFormat<RowData, ?, ?> outputFormat;
private static String[] fieldNames = new String[] {"id", "title", "author", "price", "qty"};
@@ -76,8 +76,8 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
fieldNames);
private static InternalTypeInfo<RowData> rowDataTypeInfo = InternalTypeInfo.of(rowType);
- @After
- public void tearDown() throws Exception {
+ @AfterEach
+ void tearDown() throws Exception {
if (outputFormat != null) {
outputFormat.close();
}
@@ -85,7 +85,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testInvalidDriver() {
+ void testInvalidDriver() {
String expectedMsg = "unable to open JDBC writer";
assertThatThrownBy(
() -> {
@@ -117,7 +117,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testInvalidURL() {
+ void testInvalidURL() {
assertThatThrownBy(
() -> {
JdbcConnectorOptions jdbcOptions =
@@ -147,7 +147,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testIncompatibleTypes() {
+ void testIncompatibleTypes() {
assertThatThrownBy(
() -> {
JdbcConnectorOptions jdbcOptions =
@@ -186,7 +186,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testExceptionOnInvalidType() {
+ void testExceptionOnInvalidType() {
assertThatThrownBy(
() -> {
JdbcConnectorOptions jdbcOptions =
@@ -226,7 +226,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testExceptionOnClose() {
+ void testExceptionOnClose() {
String expectedMsg = "Writing records to JDBC failed.";
assertThatThrownBy(
() -> {
@@ -275,7 +275,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testJdbcOutputFormat() throws IOException, SQLException {
+ void testJdbcOutputFormat() throws IOException, SQLException {
JdbcConnectorOptions jdbcOptions =
JdbcConnectorOptions.builder()
.setDriverName(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -328,7 +328,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testFlush() throws SQLException, IOException {
+ void testFlush() throws SQLException, IOException {
JdbcConnectorOptions jdbcOptions =
JdbcConnectorOptions.builder()
.setDriverName(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -398,7 +398,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testFlushWithBatchSizeEqualsZero() throws SQLException, IOException {
+ void testFlushWithBatchSizeEqualsZero() throws SQLException, IOException {
JdbcConnectorOptions jdbcOptions =
JdbcConnectorOptions.builder()
.setDriverName(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -445,7 +445,7 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testInvalidConnectionInJdbcOutputFormat() throws IOException, SQLException {
+ void testInvalidConnectionInJdbcOutputFormat() throws IOException, SQLException {
JdbcConnectorOptions jdbcOptions =
JdbcConnectorOptions.builder()
.setDriverName(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -506,8 +506,8 @@ public class JdbcOutputFormatTest extends JdbcDataTestBase {
}
}
- @After
- public void clearOutputTable() throws Exception {
+ @AfterEach
+ void clearOutputTable() throws Exception {
Class.forName(DERBY_EBOOKSHOP_DB.getDriverClass());
try (Connection conn = DriverManager.getConnection(DERBY_EBOOKSHOP_DB.getUrl());
Statement stat = conn.createStatement()) {
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
index 005f60c..1e9ce3a 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormatTest.java
@@ -34,10 +34,8 @@ import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.Serializable;
@@ -52,11 +50,10 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.SELECT_ALL_BOOKS_S
import static org.apache.flink.connector.jdbc.JdbcTestFixture.SELECT_EMPTY;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test suite for {@link JdbcRowDataInputFormat}. */
-public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
-
- @Rule public ExpectedException thrown = ExpectedException.none();
+class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
private JdbcRowDataInputFormat inputFormat;
private static String[] fieldNames = new String[] {"id", "title", "author", "price", "qty"};
@@ -81,8 +78,8 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
.toArray(LogicalType[]::new),
fieldNames);
- @After
- public void tearDown() throws IOException {
+ @AfterEach
+ void tearDown() throws IOException {
if (inputFormat != null) {
inputFormat.close();
inputFormat.closeInputFormat();
@@ -91,91 +88,115 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testNoRowConverter() throws IOException {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("No row converter supplied");
- inputFormat =
- JdbcRowDataInputFormat.builder()
- .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
- .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
- .setQuery(SELECT_ALL_BOOKS)
- .build();
- inputFormat.openInputFormat();
+ void testNoRowConverter() {
+ assertThatThrownBy(
+ () -> {
+ inputFormat =
+ JdbcRowDataInputFormat.builder()
+ .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+ .setQuery(SELECT_ALL_BOOKS)
+ .build();
+ inputFormat.openInputFormat();
+ })
+ .isInstanceOf(NullPointerException.class);
}
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidDriver() throws IOException {
- inputFormat =
- JdbcRowDataInputFormat.builder()
- .setDrivername("org.apache.derby.jdbc.idontexist")
- .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
- .setQuery(SELECT_ALL_BOOKS)
- .setRowConverter(dialect.getRowConverter(rowType))
- .build();
- inputFormat.openInputFormat();
+ @Test
+ void testInvalidDriver() {
+ assertThatThrownBy(
+ () -> {
+ inputFormat =
+ JdbcRowDataInputFormat.builder()
+ .setDrivername("org.apache.derby.jdbc.idontexist")
+ .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowConverter(dialect.getRowConverter(rowType))
+ .build();
+ inputFormat.openInputFormat();
+ })
+ .isInstanceOf(IllegalArgumentException.class);
}
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidURL() throws IOException {
- inputFormat =
- JdbcRowDataInputFormat.builder()
- .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
- .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
- .setQuery(SELECT_ALL_BOOKS)
- .setRowConverter(dialect.getRowConverter(rowType))
- .build();
- inputFormat.openInputFormat();
+ @Test
+ void testInvalidURL() {
+ assertThatThrownBy(
+ () -> {
+ inputFormat =
+ JdbcRowDataInputFormat.builder()
+ .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowConverter(dialect.getRowConverter(rowType))
+ .build();
+ inputFormat.openInputFormat();
+ })
+ .isInstanceOf(IllegalArgumentException.class);
}
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidQuery() throws IOException {
- inputFormat =
- JdbcRowDataInputFormat.builder()
- .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
- .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
- .setQuery("iamnotsql")
- .setRowConverter(dialect.getRowConverter(rowType))
- .build();
- inputFormat.openInputFormat();
+ @Test
+ void testInvalidQuery() {
+ assertThatThrownBy(
+ () -> {
+ inputFormat =
+ JdbcRowDataInputFormat.builder()
+ .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+ .setQuery("iamnotsql")
+ .setRowConverter(dialect.getRowConverter(rowType))
+ .build();
+ inputFormat.openInputFormat();
+ })
+ .isInstanceOf(IllegalArgumentException.class);
}
@Test
- public void testNoQuery() throws IOException {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("No query supplied");
- inputFormat =
- JdbcRowDataInputFormat.builder()
- .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
- .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
- .setRowConverter(dialect.getRowConverter(rowType))
- .build();
+ void testNoQuery() {
+ assertThatThrownBy(
+ () -> {
+ inputFormat =
+ JdbcRowDataInputFormat.builder()
+ .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+ .setRowConverter(dialect.getRowConverter(rowType))
+ .build();
+ })
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("No query supplied");
}
@Test
- public void testNoUrl() throws IOException {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("jdbc url is empty");
- inputFormat =
- JdbcRowDataInputFormat.builder()
- .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
- .setQuery(SELECT_ALL_BOOKS)
- .setRowConverter(dialect.getRowConverter(rowType))
- .build();
+ void testNoUrl() {
+ assertThatThrownBy(
+ () -> {
+ inputFormat =
+ JdbcRowDataInputFormat.builder()
+ .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setQuery(SELECT_ALL_BOOKS)
+ .setRowConverter(dialect.getRowConverter(rowType))
+ .build();
+ })
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("jdbc url is empty");
}
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidFetchSize() {
- inputFormat =
- JdbcRowDataInputFormat.builder()
- .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
- .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
- .setQuery(SELECT_ALL_BOOKS)
- .setFetchSize(-7)
- .build();
+ @Test
+ void testInvalidFetchSize() {
+ assertThatThrownBy(
+ () -> {
+ inputFormat =
+ JdbcRowDataInputFormat.builder()
+ .setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
+ .setQuery(SELECT_ALL_BOOKS)
+ .setFetchSize(-7)
+ .build();
+ })
+ .isInstanceOf(IllegalArgumentException.class);
}
@Test
- public void testValidFetchSizeIntegerMin() {
+ void testValidFetchSizeIntegerMin() {
inputFormat =
JdbcRowDataInputFormat.builder()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -187,7 +208,7 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testJdbcInputFormatWithoutParallelism() throws IOException {
+ void testJdbcInputFormatWithoutParallelism() throws IOException {
inputFormat =
JdbcRowDataInputFormat.builder()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
@@ -215,7 +236,7 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testJdbcInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
+ void testJdbcInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
final int fetchSize = 1;
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
@@ -253,8 +274,7 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testJdbcInputFormatWithoutParallelismAndNumericColumnSplitting()
- throws IOException {
+ void testJdbcInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException {
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - 1].id;
final long fetchSize = max + 1; // generate a single split
@@ -292,7 +312,7 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testJdbcInputFormatWithParallelismAndGenericSplitting() throws IOException {
+ void testJdbcInputFormatWithParallelismAndGenericSplitting() throws IOException {
Serializable[][] queryParameters = new String[2][1];
queryParameters[0] = new String[] {TEST_DATA[3].author};
queryParameters[1] = new String[] {TEST_DATA[0].author};
@@ -339,7 +359,7 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
}
@Test
- public void testEmptyResults() throws IOException {
+ void testEmptyResults() throws IOException {
inputFormat =
JdbcRowDataInputFormat.builder()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
index c6cd608..72b083e 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
@@ -41,7 +41,7 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB
import static org.assertj.core.api.Assertions.assertThat;
/** Test suite for {@link JdbcRowDataLookupFunction}. */
-public class JdbcRowDataLookupFunctionTest extends JdbcLookupTestBase {
+class JdbcRowDataLookupFunctionTest extends JdbcLookupTestBase {
private static final String[] fieldNames = new String[] {"id1", "id2", "comment1", "comment2"};
private static final DataType[] fieldDataTypes =
@@ -53,7 +53,7 @@ public class JdbcRowDataLookupFunctionTest extends JdbcLookupTestBase {
@ParameterizedTest(name = "withFailure = {0}")
@ValueSource(booleans = {false, true})
- public void testLookup(boolean withFailure) throws Exception {
+ void testLookup(boolean withFailure) throws Exception {
JdbcRowDataLookupFunction lookupFunction = buildRowDataLookupFunction(withFailure);
ListOutputCollector collector = new ListOutputCollector();
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
index adac5b2..055e028 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
/** Plan tests for JDBC connector, for example, testing projection push down. */
public class JdbcTablePlanTest extends TableTestBase {
-
+ // TODO: Update to junit5 after TableTestBase migrated (maybe copy the class?)
private final StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault());
@Before
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
index d62da01..af03459 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
@@ -29,12 +29,13 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.math.BigDecimal;
@@ -56,7 +57,8 @@ import static org.assertj.core.api.Assertions.assertThat;
* Test unsigned type conversion between Flink and JDBC driver mysql, the test underlying use MySQL
* to mock a DB.
*/
-public class UnsignedTypeConversionITCase extends AbstractTestBase {
+@Testcontainers
+class UnsignedTypeConversionITCase extends AbstractTestBase {
private static final Logger LOGGER =
LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
@@ -96,8 +98,8 @@ public class UnsignedTypeConversionITCase extends AbstractTestBase {
new BigDecimal("18446744073709551615")
};
- @ClassRule
- public static final MySQLContainer<?> MYSQL_CONTAINER =
+ @Container
+ static final MySQLContainer<?> MYSQL_CONTAINER =
new MySQLContainer<>(MYSQL_57_IMAGE)
.withEnv(DEFAULT_CONTAINER_ENV_MAP)
.withUsername(USER)
@@ -106,7 +108,7 @@ public class UnsignedTypeConversionITCase extends AbstractTestBase {
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
@Test
- public void testUnsignedType() throws Exception {
+ void testUnsignedType() throws Exception {
try (Connection con =
DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(), USER, PASSWORD)) {
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtilTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtilTest.java
index cf5ef28..b4c6720 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtilTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/utils/JdbcTypeUtilTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.jdbc.utils;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.sql.Types;
@@ -29,10 +29,10 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Testing the type conversions from Flink to SQL types. */
-public class JdbcTypeUtilTest {
+class JdbcTypeUtilTest {
@Test
- public void testTypeConversions() {
+ void testTypeConversions() {
assertThat(logicalTypeToSqlType(LogicalTypeRoot.INTEGER)).isEqualTo(Types.INTEGER);
testUnsupportedType(LogicalTypeRoot.RAW);
testUnsupportedType(LogicalTypeRoot.MAP);
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
index 6da62e4..5c5867b 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
@@ -42,18 +42,18 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.LogLevelRule;
import org.apache.flink.util.function.SerializableSupplier;
import com.mysql.cj.jdbc.MysqlXADataSource;
import oracle.jdbc.xa.client.OracleXADataSource;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.postgresql.xa.PGXADataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,10 +92,9 @@ import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingO
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkState;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.slf4j.event.Level.TRACE;
/** A simple end-to-end test for {@link JdbcXaSinkFunction}. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
private static final Random RANDOM = new Random(System.currentTimeMillis());
@@ -104,14 +103,6 @@ public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
private static final long CHECKPOINT_TIMEOUT_MS = 20_000L;
private static final long TASK_CANCELLATION_TIMEOUT_MS = 20_000L;
- // todo: remove after fixing FLINK-22889
- @ClassRule
- public static final LogLevelRule TEST_LOG_LEVEL_RULE =
- new LogLevelRule()
- .set(JdbcExactlyOnceSinkE2eTest.class, TRACE)
- .set(XaFacadeImpl.class, TRACE)
- .set(MySqlJdbcExactlyOnceSinkTestEnv.InnoDbStatusLogger.class, TRACE);
-
private interface JdbcExactlyOnceSinkTestEnv {
void start();
@@ -124,7 +115,7 @@ public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
int getParallelism();
}
- @Parameterized.Parameter public JdbcExactlyOnceSinkTestEnv dbEnv;
+ @Parameter public JdbcExactlyOnceSinkTestEnv dbEnv;
private MiniClusterWithClientResource cluster;
@@ -138,7 +129,7 @@ public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
// not using SharedObjects because we want to explicitly control which tag (attempt) to use
private static final Map<Integer, CountDownLatch> inactiveMappers = new ConcurrentHashMap<>();
- @Parameterized.Parameters(name = "{0}")
+ @Parameters(name = "{0}")
public static Collection<JdbcExactlyOnceSinkTestEnv> parameters() {
return Arrays.asList(
// PGSQL: check for issues with suspending connections (requires pooling) and
@@ -154,7 +145,7 @@ public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
);
}
- @Before
+ @BeforeEach
public void before() throws Exception {
Configuration configuration = new Configuration();
// single failover region to allow checkpointing even after some sources have finished and
@@ -177,7 +168,7 @@ public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
super.before();
}
- @After
+ @AfterEach
@Override
public void after() {
// no need for cleanup - done by test container tear down
@@ -190,8 +181,8 @@ public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
inactiveMappers.clear();
}
- @Test
- public void testInsert() throws Exception {
+ @TestTemplate
+ void testInsert() throws Exception {
long started = System.currentTimeMillis();
LOG.info("Test insert for {}", dbEnv);
int elementsPerSource = 50;
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java
index 18e05e2..3ba4aba 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaFacadeImplTest.java
@@ -21,7 +21,7 @@ import org.apache.flink.connector.jdbc.DbMetadata;
import org.apache.flink.connector.jdbc.JdbcTestBase;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import javax.sql.XAConnection;
import javax.sql.XADataSource;
@@ -37,7 +37,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/** {@link XaFacadeImpl} tests. */
-public class JdbcXaFacadeImplTest extends JdbcTestBase {
+class JdbcXaFacadeImplTest extends JdbcTestBase {
private static final Xid XID =
new Xid() {
@@ -58,7 +58,7 @@ public class JdbcXaFacadeImplTest extends JdbcTestBase {
};
@Test
- public void testRecover() throws Exception {
+ void testRecover() throws Exception {
try (XaFacade f = XaFacadeImpl.fromXaDataSource(getDbMetadata().buildXaDataSource())) {
f.open();
assertThat(f.recover()).isEmpty();
@@ -80,7 +80,7 @@ public class JdbcXaFacadeImplTest extends JdbcTestBase {
}
@Test
- public void testClose() throws Exception {
+ void testClose() throws Exception {
// some drivers (derby, H2) close both connection on either
// connection.close/xaConnection.close() call, so use mocks here to:
// a) prevent closing XA connection from connection.close()
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java
index 39eef2c..c642b1f 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkDerbyTest.java
@@ -21,7 +21,7 @@ import org.apache.flink.connector.jdbc.DbMetadata;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.apache.derby.jdbc.EmbeddedXADataSource;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
import static org.assertj.core.api.Assertions.assertThat;
@@ -31,14 +31,14 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
* {@link JdbcXaSinkFunction} tests using Derby DB. Derby supports XA but doesn't use MVCC, so we
* can't check anything before all transactions are completed.
*/
-public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
+class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
/**
* checkpoint > capture state > emit > snapshot > close > init(captured state), open > emit >
* checkpoint.
*/
@Test
- public void noDuplication() throws Exception {
+ void noDuplication() throws Exception {
sinkHelper.notifyCheckpointComplete(0);
TestXaSinkStateHandler newState = new TestXaSinkStateHandler();
newState.store(sinkHelper.getState().load(null));
@@ -50,7 +50,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
}
@Test
- public void testTxEndedOnClose() throws Exception {
+ void testTxEndedOnClose() throws Exception {
sinkHelper.emit(
TEST_DATA[0]); // don't snapshotState to prevent transaction from being prepared
sinkHelper.close();
@@ -58,7 +58,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
}
@Test
- public void testTxRollbackOnStartup() throws Exception {
+ void testTxRollbackOnStartup() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
xaHelper.assertPreparedTxCountEquals(1);
sinkHelper.close();
@@ -73,7 +73,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
}
@Test
- public void testRestoreWithNotificationMissing() throws Exception {
+ void testRestoreWithNotificationMissing() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
sinkHelper.close();
sinkHelper = buildSinkHelper(sinkHelper.getState());
@@ -82,7 +82,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
}
@Test
- public void testCommitUponStart() throws Exception {
+ void testCommitUponStart() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
sinkHelper.close();
buildAndInit(0, XaFacadeImpl.fromXaDataSource(xaDataSource), sinkHelper.getState());
@@ -91,12 +91,12 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
/** RM may return {@link javax.transaction.xa.XAResource#XA_RDONLY XA_RDONLY} error. */
@Test
- public void testEmptyCheckpoint() throws Exception {
+ void testEmptyCheckpoint() throws Exception {
sinkHelper.snapshotState(0);
}
@Test
- public void testTwoCheckpointsComplete1st() throws Exception {
+ void testTwoCheckpointsComplete1st() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP1);
JdbcXaSinkTestHelper sinkHelper = this.sinkHelper;
@@ -107,7 +107,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
}
@Test
- public void testTwoCheckpointsComplete2nd() throws Exception {
+ void testTwoCheckpointsComplete2nd() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
sinkHelper.emitAndCheckpoint(JdbcTestFixture.CP1);
xaHelper.assertDbContentsEquals(
@@ -117,7 +117,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
}
@Test
- public void testTwoCheckpointsCompleteBoth() throws Exception {
+ void testTwoCheckpointsCompleteBoth() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP1);
sinkHelper.notifyCheckpointComplete(JdbcTestFixture.CP0.id);
@@ -129,7 +129,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
}
@Test
- public void testTwoCheckpointsCompleteBothOutOfOrder() throws Exception {
+ void testTwoCheckpointsCompleteBothOutOfOrder() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP1);
sinkHelper.notifyCheckpointComplete(JdbcTestFixture.CP1.id);
@@ -141,7 +141,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
}
@Test
- public void testRestore() throws Exception {
+ void testRestore() throws Exception {
sinkHelper.emitAndCheckpoint(JdbcTestFixture.CP0);
sinkHelper.close();
sinkHelper = new JdbcXaSinkTestHelper(buildAndInit(), new TestXaSinkStateHandler());
@@ -150,7 +150,7 @@ public class JdbcXaSinkDerbyTest extends JdbcXaSinkTestBase {
}
@Test
- public void testFailurePropagation() throws Exception {
+ void testFailurePropagation() throws Exception {
/* big enough flush interval to cause error in snapshotState rather than in invoke*/
sinkHelper =
new JdbcXaSinkTestHelper(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java
index c869026..b9abcba 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkH2Test.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.jdbc.xa;
import org.apache.flink.connector.jdbc.DbMetadata;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
import static org.assertj.core.api.Assertions.assertThat;
@@ -30,22 +30,22 @@ import static org.assertj.core.api.Assertions.assertThat;
* transaction is not yet committed). But XA support isn't full, so for some scenarios {@link
* org.apache.flink.connector.jdbc.xa.h2.H2XaDsWrapper wrapper} is used, and for some - Derby.
*/
-public class JdbcXaSinkH2Test extends JdbcXaSinkTestBase {
+class JdbcXaSinkH2Test extends JdbcXaSinkTestBase {
@Test
- public void testIgnoreDuplicatedNotification() throws Exception {
+ void testIgnoreDuplicatedNotification() throws Exception {
sinkHelper.emitAndCheckpoint(JdbcTestFixture.CP0);
sinkHelper.notifyCheckpointComplete(JdbcTestFixture.CP0.id);
}
/** RM may return {@link javax.transaction.xa.XAResource#XA_RDONLY XA_RDONLY} error. */
@Test
- public void testEmptyCheckpoint() throws Exception {
+ void testEmptyCheckpoint() throws Exception {
sinkHelper.snapshotState(0);
}
@Test
- public void testHappyFlow() throws Exception {
+ void testHappyFlow() throws Exception {
sinkHelper.emit(TEST_DATA[0]);
assertThat(xaHelper.countInDb())
.as("record should not be inserted before the checkpoint started.")
@@ -63,7 +63,7 @@ public class JdbcXaSinkH2Test extends JdbcXaSinkTestBase {
}
@Test
- public void testTwoCheckpointsWithoutData() throws Exception {
+ void testTwoCheckpointsWithoutData() throws Exception {
JdbcXaSinkTestHelper sinkHelper = this.sinkHelper;
sinkHelper.snapshotState(1);
sinkHelper.snapshotState(2);
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
index 3466e85..04e3576 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
@@ -27,12 +27,14 @@ import org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.Preconditions;
-import org.junit.After;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
import javax.transaction.xa.Xid;
@@ -54,7 +56,7 @@ import static org.apache.flink.streaming.util.OperatorSnapshotUtil.readStateHand
import static org.apache.flink.streaming.util.OperatorSnapshotUtil.writeStateHandle;
/** Tests state migration for {@link JdbcXaSinkFunction}. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
public class JdbcXaSinkMigrationTest extends JdbcTestBase {
// write a snapshot:
@@ -66,7 +68,7 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase {
writeSnapshot(parseVersionArg(args));
}
- @Parameterized.Parameters
+ @Parameters
public static Collection<FlinkVersion> getReadVersions() {
return Collections.emptyList();
}
@@ -77,8 +79,9 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase {
private final FlinkVersion readVersion;
- @Test
- public void testCommitFromSnapshot() throws Exception {
+ @TestTemplate
+ @Disabled // as getReadVersions is empty and fails
+ void testCommitFromSnapshot() throws Exception {
preparePendingTransaction();
try (OneInputStreamOperatorTestHarness<TestEntry, Object> harness =
createHarness(buildSink())) {
@@ -96,8 +99,8 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase {
}
}
- @After
- public void cleanUp() throws Exception {
+ @AfterEach
+ void cleanUp() throws Exception {
cancelAllTx();
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java
index 4baf319..f1e4751 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkNoInsertionTest.java
@@ -20,16 +20,16 @@ package org.apache.flink.connector.jdbc.xa;
import org.apache.flink.connector.jdbc.DbMetadata;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests that data is not inserted ahead of time. */
-public class JdbcXaSinkNoInsertionTest extends JdbcXaSinkTestBase {
+class JdbcXaSinkNoInsertionTest extends JdbcXaSinkTestBase {
@Test
- public void testNoInsertAfterInvoke() throws Exception {
+ void testNoInsertAfterInvoke() throws Exception {
sinkHelper.emit(TEST_DATA[0]);
assertThat(xaHelper.countInDb())
.as("no records should be inserted for incomplete checkpoints.")
@@ -37,7 +37,7 @@ public class JdbcXaSinkNoInsertionTest extends JdbcXaSinkTestBase {
}
@Test
- public void testNoInsertAfterSnapshot() throws Exception {
+ void testNoInsertAfterSnapshot() throws Exception {
sinkHelper.emitAndSnapshot(JdbcTestFixture.CP0);
assertThat(xaHelper.countInDb())
.as("no records should be inserted for incomplete checkpoints.")
@@ -52,7 +52,7 @@ public class JdbcXaSinkNoInsertionTest extends JdbcXaSinkTestBase {
}
@Test
- public void testNoInsertAfterFacadeClose() throws Exception {
+ void testNoInsertAfterFacadeClose() throws Exception {
try (XaFacadeImpl xaFacade = XaFacadeImpl.fromXaDataSource(xaDataSource)) {
sinkHelper =
new JdbcXaSinkTestHelper(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
index efaf4b2..3a688aa 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
@@ -52,8 +52,8 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import javax.sql.XADataSource;
import javax.transaction.xa.Xid;
@@ -75,14 +75,14 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.INSERT_TEMPLATE;
* // todo: javadoc case Base class for {@link JdbcXaSinkFunction} tests. In addition to {@link
* JdbcTestBase} init it initializes/closes helpers.
*/
-public abstract class JdbcXaSinkTestBase extends JdbcTestBase {
+abstract class JdbcXaSinkTestBase extends JdbcTestBase {
JdbcXaFacadeTestHelper xaHelper;
JdbcXaSinkTestHelper sinkHelper;
XADataSource xaDataSource;
- @Before
- public void initHelpers() throws Exception {
+ @BeforeEach
+ void initHelpers() throws Exception {
xaDataSource = getDbMetadata().buildXaDataSource();
xaHelper =
new JdbcXaFacadeTestHelper(
@@ -98,8 +98,8 @@ public abstract class JdbcXaSinkTestBase extends JdbcTestBase {
return new TestXaSinkStateHandler();
}
- @After
- public void closeHelpers() throws Exception {
+ @AfterEach
+ void closeHelpers() throws Exception {
if (sinkHelper != null) {
sinkHelper.close();
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java
index 63fe55c..c105a91 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/SemanticXidGeneratorTest.java
@@ -19,7 +19,7 @@ package org.apache.flink.connector.jdbc.xa;
import org.apache.flink.api.common.JobID;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import javax.transaction.xa.Xid;
@@ -31,18 +31,18 @@ import static org.apache.flink.connector.jdbc.xa.JdbcXaSinkTestBase.TEST_RUNTIME
import static org.assertj.core.api.Assertions.assertThat;
/** Simple uniqueness tests for the {@link SemanticXidGenerator}. */
-public class SemanticXidGeneratorTest {
+class SemanticXidGeneratorTest {
private static final int COUNT = 100_000;
@Test
- public void testXidsUniqueAmongCheckpoints() {
+ void testXidsUniqueAmongCheckpoints() {
SemanticXidGenerator xidGenerator = new SemanticXidGenerator();
xidGenerator.open();
checkUniqueness(checkpoint -> xidGenerator.generateXid(TEST_RUNTIME_CONTEXT, checkpoint));
}
@Test
- public void testXidsUniqueAmongJobs() {
+ void testXidsUniqueAmongJobs() {
long checkpointId = 1L;
SemanticXidGenerator generator = new SemanticXidGenerator();
checkUniqueness(
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/XidImplTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/XidImplTest.java
index 33a2053..58ca865 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/XidImplTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/XidImplTest.java
@@ -17,7 +17,7 @@
package org.apache.flink.connector.jdbc.xa;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.util.Random;
@@ -26,11 +26,11 @@ import static javax.transaction.xa.Xid.MAXGTRIDSIZE;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link XidImpl}. */
-public class XidImplTest {
+class XidImplTest {
static final XidImpl XID = new XidImpl(1, randomBytes(MAXGTRIDSIZE), randomBytes(MAXBQUALSIZE));
@Test
- public void testXidsEqual() {
+ void testXidsEqual() {
XidImpl other =
new XidImpl(
XID.getFormatId(), XID.getGlobalTransactionId(), XID.getBranchQualifier());
@@ -38,7 +38,7 @@ public class XidImplTest {
}
@Test
- public void testXidsNotEqual() {
+ void testXidsNotEqual() {
assertThat(new XidImpl(0, XID.getGlobalTransactionId(), XID.getBranchQualifier()))
.isNotEqualTo(XID);
assertThat(