You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/03/10 16:17:06 UTC
[flink] 08/10: [FLINK-15782][connectors/jdbc] refactor JDBC sink
tests Changes: 1. extract DbMetadata to allow to use different databases 2.
extract test fixture from the base class 3. use inheritance only for
@Before/@After behaviour
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 40cab49012e2a54db36da2b44b951a237057bd49
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Jan 14 00:43:30 2020 +0100
[FLINK-15782][connectors/jdbc] refactor JDBC sink tests
Changes:
1. extract DbMetadata to allow to use different databases
2. extract test fixture from the base class
3. use inheritance only for @Before/@After behaviour
---
.../apache/flink/api/java/io/jdbc/DbMetadata.java | 36 +++++
.../flink/api/java/io/jdbc/DerbyDbMetadata.java | 64 ++++++++
.../flink/api/java/io/jdbc/JDBCDataTestBase.java | 37 +++++
.../flink/api/java/io/jdbc/JDBCFullTest.java | 43 +++---
.../api/java/io/jdbc/JDBCInputFormatTest.java | 78 +++++-----
.../api/java/io/jdbc/JDBCLookupFunctionITCase.java | 8 +-
.../api/java/io/jdbc/JDBCOutputFormatTest.java | 61 ++++----
.../flink/api/java/io/jdbc/JDBCTestBase.java | 164 +++------------------
.../flink/api/java/io/jdbc/JDBCTestCheckpoint.java | 35 +++++
.../java/io/jdbc/JDBCUpsertTableSinkITCase.java | 7 +-
.../java/io/jdbc/JdbcTableOutputFormatTest.java | 15 +-
.../{JDBCTestBase.java => JdbcTestFixture.java} | 123 +++++++++-------
12 files changed, 380 insertions(+), 291 deletions(-)
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DbMetadata.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DbMetadata.java
new file mode 100644
index 0000000..14731e4
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DbMetadata.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import javax.sql.XADataSource;
+
+import java.io.Serializable;
+
+/**
+ * Describes a database: driver, schema and urls.
+ */
+public interface DbMetadata extends Serializable {
+
+ String getInitUrl();
+
+ String getUrl();
+
+ XADataSource buildXaDataSource();
+
+ String getDriverClass();
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyDbMetadata.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyDbMetadata.java
new file mode 100644
index 0000000..79a62bc
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyDbMetadata.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.derby.jdbc.EmbeddedXADataSource;
+
+import javax.sql.XADataSource;
+
+/**
+ * DerbyDbMetadata.
+ */
+public class DerbyDbMetadata implements DbMetadata {
+ private final String dbName;
+ private final String dbInitUrl;
+ private final String url;
+
+ public DerbyDbMetadata(String schemaName) {
+ dbName = "memory:" + schemaName;
+ url = "jdbc:derby:" + dbName;
+ dbInitUrl = url + ";create=true";
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ @Override
+ public String getInitUrl() {
+ return dbInitUrl;
+ }
+
+ @Override
+ public XADataSource buildXaDataSource() {
+ EmbeddedXADataSource ds = new EmbeddedXADataSource();
+ ds.setDatabaseName(dbName);
+ return ds;
+ }
+
+ @Override
+ public String getDriverClass() {
+ return "org.apache.derby.jdbc.EmbeddedDriver";
+ }
+
+ @Override
+ public String getUrl() {
+ return url;
+ }
+
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCDataTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCDataTestBase.java
new file mode 100644
index 0000000..8fb1539
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCDataTestBase.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.junit.Before;
+
+import java.sql.SQLException;
+
+/**
+ * Base class for JDBC test using data from {@link JdbcTestFixture}. It uses {@link DerbyDbMetadata} and inserts data before each test.
+ */
+abstract class JDBCDataTestBase extends JDBCTestBase {
+ @Before
+ public void initData() throws SQLException {
+ JdbcTestFixture.initData(getDbMetadata());
+ }
+
+ @Override
+ protected DbMetadata getDbMetadata() {
+ return JdbcTestFixture.DERBY_EBOOKSHOP_DB;
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
index 14dc85a..28ac1a1 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -26,7 +26,9 @@ import org.apache.flink.types.Row;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -35,12 +37,17 @@ import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.Types;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.OUTPUT_TABLE;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.ROW_TYPE_INFO;
import static org.hamcrest.core.StringContains.containsString;
/**
* Tests using both {@link JDBCInputFormat} and {@link JDBCOutputFormat}.
*/
-public class JDBCFullTest extends JDBCTestBase {
+public class JDBCFullTest extends JDBCDataTestBase {
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
@Test
public void testWithoutParallelism() throws Exception {
@@ -58,8 +65,8 @@ public class JDBCFullTest extends JDBCTestBase {
exception.expectMessage(containsString("field index: 3, field value: 11.11."));
JDBCOutputFormat jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(JDBCTestBase.DRIVER_CLASS)
- .setDBUrl(JDBCTestBase.DB_URL)
+ .setDrivername(getDbMetadata().getDriverClass())
+ .setDBUrl(getDbMetadata().getUrl())
.setQuery("insert into newbooks (id, title, author, price, qty) values (?,?,?,?,?)")
.setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.DOUBLE, Types.INTEGER})
.finish();
@@ -73,18 +80,18 @@ public class JDBCFullTest extends JDBCTestBase {
private void runTest(boolean exploitParallelism) throws Exception {
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(JDBCTestBase.DRIVER_CLASS)
- .setDBUrl(JDBCTestBase.DB_URL)
- .setQuery(JDBCTestBase.SELECT_ALL_BOOKS)
+ .setDrivername(getDbMetadata().getDriverClass())
+ .setDBUrl(getDbMetadata().getUrl())
+ .setQuery(JdbcTestFixture.SELECT_ALL_BOOKS)
.setRowTypeInfo(ROW_TYPE_INFO);
if (exploitParallelism) {
final int fetchSize = 1;
- final long min = JDBCTestBase.TEST_DATA[0].id;
- final long max = JDBCTestBase.TEST_DATA[JDBCTestBase.TEST_DATA.length - fetchSize].id;
+ final long min = JdbcTestFixture.TEST_DATA[0].id;
+ final long max = JdbcTestFixture.TEST_DATA[JdbcTestFixture.TEST_DATA.length - fetchSize].id;
//use a "splittable" query to exploit parallelism
inputBuilder = inputBuilder
- .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
+ .setQuery(JdbcTestFixture.SELECT_ALL_BOOKS_SPLIT_BY_ID)
.setParametersProvider(new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize));
}
DataSet<Row> source = environment.createInput(inputBuilder.finish());
@@ -93,8 +100,8 @@ public class JDBCFullTest extends JDBCTestBase {
//some databases don't null values correctly when no column type was specified
//in PreparedStatement.setObject (see its javadoc for more details)
source.output(JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(JDBCTestBase.DRIVER_CLASS)
- .setDBUrl(JDBCTestBase.DB_URL)
+ .setDrivername(getDbMetadata().getDriverClass())
+ .setDBUrl(getDbMetadata().getUrl())
.setQuery("insert into newbooks (id, title, author, price, qty) values (?,?,?,?,?)")
.setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.DOUBLE, Types.INTEGER})
.finish());
@@ -102,24 +109,24 @@ public class JDBCFullTest extends JDBCTestBase {
environment.execute();
try (
- Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
- PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
- ResultSet resultSet = statement.executeQuery()
+ Connection dbConn = DriverManager.getConnection(getDbMetadata().getUrl());
+ PreparedStatement statement = dbConn.prepareStatement(JdbcTestFixture.SELECT_ALL_NEWBOOKS);
+ ResultSet resultSet = statement.executeQuery()
) {
int count = 0;
while (resultSet.next()) {
count++;
}
- Assert.assertEquals(JDBCTestBase.TEST_DATA.length, count);
+ Assert.assertEquals(JdbcTestFixture.TEST_DATA.length, count);
}
}
@After
public void clearOutputTable() throws Exception {
- Class.forName(DRIVER_CLASS);
+ Class.forName(getDbMetadata().getDriverClass());
try (
- Connection conn = DriverManager.getConnection(DB_URL);
- Statement stat = conn.createStatement()) {
+ Connection conn = DriverManager.getConnection(getDbMetadata().getUrl());
+ Statement stat = conn.createStatement()) {
stat.execute("DELETE FROM " + OUTPUT_TABLE);
stat.close();
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index 559976d..ca4f8e3 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.io.jdbc;
+import org.apache.flink.api.java.io.jdbc.JdbcTestFixture.TestEntry;
import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
@@ -34,10 +35,15 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.ROW_TYPE_INFO;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.SELECT_ALL_BOOKS;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.SELECT_EMPTY;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.TEST_DATA;
+
/**
* Tests for the {@link JDBCInputFormat}.
*/
-public class JDBCInputFormatTest extends JDBCTestBase {
+public class JDBCInputFormatTest extends JDBCDataTestBase {
private JDBCInputFormat jdbcInputFormat;
@@ -54,7 +60,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
public void testUntypedRowInfo() throws IOException {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("org.apache.derby.jdbc.idontexist")
- .setDBUrl(DB_URL)
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS)
.finish();
jdbcInputFormat.openInputFormat();
@@ -64,7 +70,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
public void testInvalidDriver() throws IOException {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("org.apache.derby.jdbc.idontexist")
- .setDBUrl(DB_URL)
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS)
.setRowTypeInfo(ROW_TYPE_INFO)
.finish();
@@ -74,7 +80,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
@Test(expected = IllegalArgumentException.class)
public void testInvalidURL() throws IOException {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
.setQuery(SELECT_ALL_BOOKS)
.setRowTypeInfo(ROW_TYPE_INFO)
@@ -85,8 +91,8 @@ public class JDBCInputFormatTest extends JDBCTestBase {
@Test(expected = IllegalArgumentException.class)
public void testInvalidQuery() throws IOException {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery("iamnotsql")
.setRowTypeInfo(ROW_TYPE_INFO)
.finish();
@@ -96,7 +102,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
@Test(expected = IllegalArgumentException.class)
public void testIncompleteConfiguration() throws IOException {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
.setQuery(SELECT_ALL_BOOKS)
.setRowTypeInfo(ROW_TYPE_INFO)
.finish();
@@ -105,8 +111,8 @@ public class JDBCInputFormatTest extends JDBCTestBase {
@Test(expected = IllegalArgumentException.class)
public void testInvalidFetchSize() {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS)
.setRowTypeInfo(ROW_TYPE_INFO)
.setFetchSize(-7)
@@ -116,8 +122,8 @@ public class JDBCInputFormatTest extends JDBCTestBase {
@Test
public void testValidFetchSizeIntegerMin() {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS)
.setRowTypeInfo(ROW_TYPE_INFO)
.setFetchSize(Integer.MIN_VALUE)
@@ -127,15 +133,15 @@ public class JDBCInputFormatTest extends JDBCTestBase {
@Test
public void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise() throws SQLException, ClassNotFoundException {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS)
.setRowTypeInfo(ROW_TYPE_INFO)
.finish();
jdbcInputFormat.openInputFormat();
- Class.forName(DRIVER_CLASS);
- final int defaultFetchSize = DriverManager.getConnection(DB_URL).createStatement().getFetchSize();
+ Class.forName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass());
+ final int defaultFetchSize = DriverManager.getConnection(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl()).createStatement().getFetchSize();
Assert.assertEquals(defaultFetchSize, jdbcInputFormat.getStatement().getFetchSize());
}
@@ -144,8 +150,8 @@ public class JDBCInputFormatTest extends JDBCTestBase {
public void testFetchSizeCanBeConfigured() throws SQLException {
final int desiredFetchSize = 10_000;
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS)
.setRowTypeInfo(ROW_TYPE_INFO)
.setFetchSize(desiredFetchSize)
@@ -158,15 +164,15 @@ public class JDBCInputFormatTest extends JDBCTestBase {
public void testDefaultAutoCommitIsUsedIfNotConfiguredOtherwise() throws SQLException, ClassNotFoundException {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS)
.setRowTypeInfo(ROW_TYPE_INFO)
.finish();
jdbcInputFormat.openInputFormat();
- Class.forName(DRIVER_CLASS);
- final boolean defaultAutoCommit = DriverManager.getConnection(DB_URL).getAutoCommit();
+ Class.forName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass());
+ final boolean defaultAutoCommit = DriverManager.getConnection(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl()).getAutoCommit();
Assert.assertEquals(defaultAutoCommit, jdbcInputFormat.getDbConn().getAutoCommit());
@@ -177,8 +183,8 @@ public class JDBCInputFormatTest extends JDBCTestBase {
final boolean desiredAutoCommit = false;
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS)
.setRowTypeInfo(ROW_TYPE_INFO)
.setAutoCommit(desiredAutoCommit)
@@ -192,8 +198,8 @@ public class JDBCInputFormatTest extends JDBCTestBase {
@Test
public void testJDBCInputFormatWithoutParallelism() throws IOException {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS)
.setRowTypeInfo(ROW_TYPE_INFO)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
@@ -223,9 +229,9 @@ public class JDBCInputFormatTest extends JDBCTestBase {
final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
- .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
+ .setQuery(JdbcTestFixture.SELECT_ALL_BOOKS_SPLIT_BY_ID)
.setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(pramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
@@ -259,9 +265,9 @@ public class JDBCInputFormatTest extends JDBCTestBase {
final long fetchSize = max + 1; //generate a single split
ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
- .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
+ .setQuery(JdbcTestFixture.SELECT_ALL_BOOKS_SPLIT_BY_ID)
.setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(pramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
@@ -295,9 +301,9 @@ public class JDBCInputFormatTest extends JDBCTestBase {
queryParameters[1] = new String[]{TEST_DATA[0].author};
ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
- .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
+ .setQuery(JdbcTestFixture.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
.setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(paramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
@@ -335,8 +341,8 @@ public class JDBCInputFormatTest extends JDBCTestBase {
@Test
public void testEmptyResults() throws IOException {
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_EMPTY)
.setRowTypeInfo(ROW_TYPE_INFO)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java
index 16a0eeb..dabd88c 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java
@@ -45,8 +45,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import static org.apache.flink.api.java.io.jdbc.JDBCTestBase.DRIVER_CLASS;
-
/**
* IT case for {@link JDBCLookupFunction}.
*/
@@ -69,9 +67,9 @@ public class JDBCLookupFunctionITCase extends AbstractTestBase {
@Before
public void before() throws ClassNotFoundException, SQLException {
- System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+ System.setProperty("derby.stream.error.field", JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL");
- Class.forName(DRIVER_CLASS);
+ Class.forName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass());
try (
Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
Statement stat = conn.createStatement()) {
@@ -123,7 +121,7 @@ public class JDBCLookupFunctionITCase extends AbstractTestBase {
@After
public void clearOutputTable() throws Exception {
- Class.forName(DRIVER_CLASS);
+ Class.forName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass());
try (
Connection conn = DriverManager.getConnection(DB_URL);
Statement stat = conn.createStatement()) {
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index 104fb87..cc9a92a 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -32,13 +32,18 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.INPUT_TABLE;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.INSERT_TEMPLATE;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.OUTPUT_TABLE;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.OUTPUT_TABLE_2;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.TEST_DATA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
/**
* Tests for the {@link JDBCOutputFormat}.
*/
-public class JDBCOutputFormatTest extends JDBCTestBase {
+public class JDBCOutputFormatTest extends JDBCDataTestBase {
private JDBCOutputFormat jdbcOutputFormat;
@@ -54,7 +59,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
public void testInvalidDriver() throws IOException {
jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("org.apache.derby.jdbc.idontexist")
- .setDBUrl(DB_URL)
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
.finish();
jdbcOutputFormat.open(0, 1);
@@ -63,7 +68,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
@Test(expected = IOException.class)
public void testInvalidURL() throws IOException {
jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(DRIVER_CLASS)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
.finish();
@@ -73,8 +78,8 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
@Test(expected = IOException.class)
public void testInvalidQuery() throws IOException {
jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery("iamnotsql")
.finish();
jdbcOutputFormat.open(0, 1);
@@ -83,7 +88,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
@Test(expected = NullPointerException.class)
public void testIncompleteConfiguration() {
jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(DRIVER_CLASS)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
.finish();
}
@@ -91,8 +96,8 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
@Test(expected = RuntimeException.class)
public void testIncompatibleTypes() throws IOException {
jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
.finish();
jdbcOutputFormat.open(0, 1);
@@ -111,8 +116,8 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
@Test(expected = RuntimeException.class)
public void testExceptionOnInvalidType() throws IOException {
jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
.setSqlTypes(new int[] {
Types.INTEGER,
@@ -123,7 +128,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
.finish();
jdbcOutputFormat.open(0, 1);
- JDBCTestBase.TestEntry entry = TEST_DATA[0];
+ JdbcTestFixture.TestEntry entry = TEST_DATA[0];
Row row = new Row(5);
row.setField(0, entry.id);
row.setField(1, entry.title);
@@ -137,8 +142,8 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
public void testExceptionOnClose() throws IOException {
jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
.setSqlTypes(new int[] {
Types.INTEGER,
@@ -149,7 +154,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
.finish();
jdbcOutputFormat.open(0, 1);
- JDBCTestBase.TestEntry entry = TEST_DATA[0];
+ JdbcTestFixture.TestEntry entry = TEST_DATA[0];
Row row = new Row(5);
row.setField(0, entry.id);
row.setField(1, entry.title);
@@ -165,22 +170,22 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
@Test
public void testJDBCOutputFormat() throws IOException, SQLException {
jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
.finish();
jdbcOutputFormat.open(0, 1);
- for (JDBCTestBase.TestEntry entry : TEST_DATA) {
+ for (JdbcTestFixture.TestEntry entry : TEST_DATA) {
jdbcOutputFormat.writeRecord(toRow(entry));
}
jdbcOutputFormat.close();
try (
- Connection dbConn = DriverManager.getConnection(DB_URL);
- PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
- ResultSet resultSet = statement.executeQuery()
+ Connection dbConn = DriverManager.getConnection(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl());
+ PreparedStatement statement = dbConn.prepareStatement(JdbcTestFixture.SELECT_ALL_NEWBOOKS);
+ ResultSet resultSet = statement.executeQuery()
) {
int recordCount = 0;
while (resultSet.next()) {
@@ -199,14 +204,14 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
@Test
public void testFlush() throws SQLException, IOException {
jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
- .setDrivername(DRIVER_CLASS)
- .setDBUrl(DB_URL)
+ .setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+ .setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE_2))
.setBatchInterval(3)
.finish();
try (
- Connection dbConn = DriverManager.getConnection(DB_URL);
- PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS_2)
+ Connection dbConn = DriverManager.getConnection(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl());
+ PreparedStatement statement = dbConn.prepareStatement(JdbcTestFixture.SELECT_ALL_NEWBOOKS_2)
) {
jdbcOutputFormat.open(0, 1);
for (int i = 0; i < 2; ++i) {
@@ -235,10 +240,10 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
@After
public void clearOutputTable() throws Exception {
- Class.forName(DRIVER_CLASS);
+ Class.forName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass());
try (
- Connection conn = DriverManager.getConnection(DB_URL);
- Statement stat = conn.createStatement()) {
+ Connection conn = DriverManager.getConnection(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl());
+ Statement stat = conn.createStatement()) {
stat.execute("DELETE FROM " + OUTPUT_TABLE);
stat.close();
@@ -246,7 +251,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
}
}
- static Row toRow(TestEntry entry) {
+ static Row toRow(JdbcTestFixture.TestEntry entry) {
Row row = new Row(5);
row.setField(0, entry.id);
row.setField(1, entry.title);
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
index 021d28f..ffd25e5 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -1,13 +1,12 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,148 +17,29 @@
package org.apache.flink.api.java.io.jdbc;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.junit.After;
+import org.junit.Before;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.ExpectedException;
-
-import java.io.OutputStream;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.cleanUpDatabasesStatic;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.cleanupData;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.initSchema;
/**
- * Base test class for JDBC Input and Output.
+ * Base class for JDBC test using DDL from {@link JdbcTestFixture}. It uses create tables before each test and drops afterwards.
*/
-public class JDBCTestBase {
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
- public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
- public static final String DB_URL = "jdbc:derby:memory:ebookshop";
- public static final String INPUT_TABLE = "books";
- public static final String OUTPUT_TABLE = "newbooks";
- public static final String OUTPUT_TABLE_2 = "newbooks2";
- public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
- public static final String SELECT_ID_BOOKS = "select id from " + INPUT_TABLE;
- public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
- public static final String SELECT_ALL_NEWBOOKS_2 = "select * from " + OUTPUT_TABLE_2;
- public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
- public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)";
- public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
- public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = SELECT_ALL_BOOKS + " WHERE author = ?";
-
- public static final TestEntry[] TEST_DATA = {
- new TestEntry(1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11),
- new TestEntry(1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22),
- new TestEntry(1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33),
- new TestEntry(1004, ("A Cup of Java"), ("Kumar"), 44.44, 44),
- new TestEntry(1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55),
- new TestEntry(1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 66.66, 66),
- new TestEntry(1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 77.77, 77),
- new TestEntry(1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 88.88, 88),
- new TestEntry(1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99),
- new TestEntry(1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010)
- };
-
- static class TestEntry {
- protected final Integer id;
- protected final String title;
- protected final String author;
- protected final Double price;
- protected final Integer qty;
+public abstract class JDBCTestBase {
- private TestEntry(Integer id, String title, String author, Double price, Integer qty) {
- this.id = id;
- this.title = title;
- this.author = author;
- this.price = price;
- this.qty = qty;
- }
+ @Before
+ public final void before() throws Exception {
+ initSchema(getDbMetadata());
}
- public static final RowTypeInfo ROW_TYPE_INFO = new RowTypeInfo(
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO);
-
- public static String getCreateQuery(String tableName) {
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE ");
- sqlQueryBuilder.append(tableName).append(" (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
- return sqlQueryBuilder.toString();
- }
-
- public static String getInsertQuery() {
- StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
- for (int i = 0; i < TEST_DATA.length; i++) {
- sqlQueryBuilder.append("(")
- .append(TEST_DATA[i].id).append(",'")
- .append(TEST_DATA[i].title).append("','")
- .append(TEST_DATA[i].author).append("',")
- .append(TEST_DATA[i].price).append(",")
- .append(TEST_DATA[i].qty).append(")");
- if (i < TEST_DATA.length - 1) {
- sqlQueryBuilder.append(",");
- }
- }
- String insertQuery = sqlQueryBuilder.toString();
- return insertQuery;
+ @After
+ public final void after() throws Exception {
+ cleanupData(getDbMetadata().getUrl());
+ cleanUpDatabasesStatic(getDbMetadata());
}
- public static final OutputStream DEV_NULL = new OutputStream() {
- @Override
- public void write(int b) {
- }
- };
-
- @BeforeClass
- public static void prepareDerbyDatabase() throws Exception {
- System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+ protected abstract DbMetadata getDbMetadata();
- Class.forName(DRIVER_CLASS);
- try (Connection conn = DriverManager.getConnection(DB_URL + ";create=true")) {
- createTable(conn, JDBCTestBase.INPUT_TABLE);
- createTable(conn, OUTPUT_TABLE);
- createTable(conn, OUTPUT_TABLE_2);
- insertDataIntoInputTable(conn);
- }
- }
-
- private static void createTable(Connection conn, String tableName) throws SQLException {
- Statement stat = conn.createStatement();
- stat.executeUpdate(getCreateQuery(tableName));
- stat.close();
- }
-
- private static void insertDataIntoInputTable(Connection conn) throws SQLException {
- Statement stat = conn.createStatement();
- stat.execute(getInsertQuery());
- stat.close();
- }
-
- @AfterClass
- public static void cleanUpDerbyDatabases() throws Exception {
- Class.forName(DRIVER_CLASS);
- try (
- Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
- Statement stat = conn.createStatement()) {
-
- stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
- stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE);
- stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_2);
- }
- }
}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestCheckpoint.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestCheckpoint.java
new file mode 100644
index 0000000..6086011
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestCheckpoint.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+/**
+ * Holds id and indices of items in {@link JdbcTestFixture#TEST_DATA}.
+ */
+public class JDBCTestCheckpoint {
+ public final long id;
+ public final int[] dataItemsIdx;
+
+ JDBCTestCheckpoint(long id, int... dataItemsIdx) {
+ this.id = id;
+ this.dataItemsIdx = dataItemsIdx;
+ }
+
+ public JDBCTestCheckpoint withCheckpointId(long id) {
+ return new JDBCTestCheckpoint(id, dataItemsIdx);
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
index 6f4f810..76f60bc 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
@@ -41,7 +41,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import static org.apache.flink.api.java.io.jdbc.JDBCTestBase.DRIVER_CLASS;
import static org.apache.flink.api.java.io.jdbc.JdbcTableOutputFormatTest.check;
/**
@@ -55,9 +54,9 @@ public class JDBCUpsertTableSinkITCase extends AbstractTestBase {
@Before
public void before() throws ClassNotFoundException, SQLException {
- System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+ System.setProperty("derby.stream.error.field", JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL");
- Class.forName(DRIVER_CLASS);
+ Class.forName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass());
try (
Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
Statement stat = conn.createStatement()) {
@@ -79,7 +78,7 @@ public class JDBCUpsertTableSinkITCase extends AbstractTestBase {
@After
public void clearOutputTable() throws Exception {
- Class.forName(DRIVER_CLASS);
+ Class.forName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass());
try (
Connection conn = DriverManager.getConnection(DB_URL);
Statement stat = conn.createStatement()) {
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTableOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTableOutputFormatTest.java
index 234657d..aaaa92c 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTableOutputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTableOutputFormatTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.io.jdbc;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.io.jdbc.JdbcTestFixture.TestEntry;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Row;
@@ -39,13 +40,15 @@ import java.util.Arrays;
import java.util.List;
import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormatTest.toRow;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.OUTPUT_TABLE;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.TEST_DATA;
import static org.junit.Assert.assertArrayEquals;
import static org.mockito.Mockito.doReturn;
/**
* Tests for the {@link JdbcBatchingOutputFormat}.
*/
-public class JdbcTableOutputFormatTest extends JDBCTestBase {
+public class JdbcTableOutputFormatTest extends JDBCDataTestBase {
private TableJdbcUpsertOutputFormat format;
private String[] fieldNames;
@@ -60,7 +63,7 @@ public class JdbcTableOutputFormatTest extends JDBCTestBase {
@Test
public void testJDBCOutputFormat() throws Exception {
JDBCOptions options = JDBCOptions.builder()
- .setDBUrl(DB_URL)
+ .setDBUrl(getDbMetadata().getUrl())
.setTableName(OUTPUT_TABLE)
.build();
JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
@@ -100,7 +103,7 @@ public class JdbcTableOutputFormatTest extends JDBCTestBase {
}
private void check(Row[] rows) throws SQLException {
- check(rows, DB_URL, OUTPUT_TABLE, fieldNames);
+ check(rows, getDbMetadata().getUrl(), OUTPUT_TABLE, fieldNames);
}
static void check(Row[] rows, String url, String table, String[] fields) throws SQLException {
@@ -131,10 +134,10 @@ public class JdbcTableOutputFormatTest extends JDBCTestBase {
format.close();
}
format = null;
- Class.forName(DRIVER_CLASS);
+ Class.forName(getDbMetadata().getDriverClass());
try (
- Connection conn = DriverManager.getConnection(DB_URL);
- Statement stat = conn.createStatement()) {
+ Connection conn = DriverManager.getConnection(getDbMetadata().getUrl());
+ Statement stat = conn.createStatement()) {
stat.execute("DELETE FROM " + OUTPUT_TABLE);
stat.close();
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTestFixture.java
similarity index 55%
copy from flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
copy to flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTestFixture.java
index 021d28f..071a2af 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTestFixture.java
@@ -21,38 +21,32 @@ package org.apache.flink.api.java.io.jdbc;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.ExpectedException;
-
import java.io.OutputStream;
+import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
/**
- * Base test class for JDBC Input and Output.
+ * Test data and helper objects for JDBC tests.
*/
-public class JDBCTestBase {
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
+@SuppressWarnings("SpellCheckingInspection")
+public class JdbcTestFixture {
+ public static final JDBCTestCheckpoint CP0 = new JDBCTestCheckpoint(0, 1, 2, 3);
+ public static final JDBCTestCheckpoint CP1 = new JDBCTestCheckpoint(1, 4, 5, 6);
- public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
- public static final String DB_URL = "jdbc:derby:memory:ebookshop";
public static final String INPUT_TABLE = "books";
- public static final String OUTPUT_TABLE = "newbooks";
- public static final String OUTPUT_TABLE_2 = "newbooks2";
- public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
- public static final String SELECT_ID_BOOKS = "select id from " + INPUT_TABLE;
- public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
- public static final String SELECT_ALL_NEWBOOKS_2 = "select * from " + OUTPUT_TABLE_2;
- public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
+ static final String OUTPUT_TABLE = "newbooks";
+ static final String OUTPUT_TABLE_2 = "newbooks2";
+ static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
+ static final String SELECT_ID_BOOKS = "select id from " + INPUT_TABLE;
+ static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
+ static final String SELECT_ALL_NEWBOOKS_2 = "select * from " + OUTPUT_TABLE_2;
+ static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)";
- public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
- public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = SELECT_ALL_BOOKS + " WHERE author = ?";
+ static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
+ static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = SELECT_ALL_BOOKS + " WHERE author = ?";
public static final TestEntry[] TEST_DATA = {
new TestEntry(1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11),
@@ -67,12 +61,18 @@ public class JDBCTestBase {
new TestEntry(1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010)
};
- static class TestEntry {
- protected final Integer id;
- protected final String title;
- protected final String author;
- protected final Double price;
- protected final Integer qty;
+ private static final String EBOOKSHOP_SCHEMA_NAME = "ebookshop";
+ public static final DerbyDbMetadata DERBY_EBOOKSHOP_DB = new DerbyDbMetadata(EBOOKSHOP_SCHEMA_NAME);
+
+ /**
+ * TestEntry.
+ */
+ public static class TestEntry implements Serializable {
+ public final Integer id;
+ public final String title;
+ public final String author;
+ public final Double price;
+ public final Integer qty;
private TestEntry(Integer id, String title, String author, Double price, Integer qty) {
this.id = id;
@@ -81,25 +81,34 @@ public class JDBCTestBase {
this.price = price;
this.qty = qty;
}
+
+ @Override
+ public String toString() {
+ return "TestEntry{" +
+ "id=" + id +
+ ", title='" + title + '\'' +
+ ", author='" + author + '\'' +
+ ", price=" + price +
+ ", qty=" + qty +
+ '}';
+ }
}
- public static final RowTypeInfo ROW_TYPE_INFO = new RowTypeInfo(
+ static final RowTypeInfo ROW_TYPE_INFO = new RowTypeInfo(
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO);
- public static String getCreateQuery(String tableName) {
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE ");
- sqlQueryBuilder.append(tableName).append(" (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
- return sqlQueryBuilder.toString();
+ private static String getCreateQuery(String tableName) {
+ return "CREATE TABLE " + tableName + " (" +
+ "id INT NOT NULL DEFAULT 0," +
+ "title VARCHAR(50) DEFAULT NULL," +
+ "author VARCHAR(50) DEFAULT NULL," +
+ "price FLOAT DEFAULT NULL," +
+ "qty INT DEFAULT NULL," +
+ "PRIMARY KEY (id))";
}
public static String getInsertQuery() {
@@ -115,25 +124,28 @@ public class JDBCTestBase {
sqlQueryBuilder.append(",");
}
}
- String insertQuery = sqlQueryBuilder.toString();
- return insertQuery;
+ return sqlQueryBuilder.toString();
}
+ @SuppressWarnings("unused") // used in string constant in prepareDatabase
public static final OutputStream DEV_NULL = new OutputStream() {
@Override
public void write(int b) {
}
};
- @BeforeClass
- public static void prepareDerbyDatabase() throws Exception {
- System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
-
- Class.forName(DRIVER_CLASS);
- try (Connection conn = DriverManager.getConnection(DB_URL + ";create=true")) {
- createTable(conn, JDBCTestBase.INPUT_TABLE);
+ public static void initSchema(DbMetadata dbMetadata) throws ClassNotFoundException, SQLException {
+ System.setProperty("derby.stream.error.field", JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL");
+ Class.forName(dbMetadata.getDriverClass());
+ try (Connection conn = DriverManager.getConnection(dbMetadata.getInitUrl())) {
+ createTable(conn, JdbcTestFixture.INPUT_TABLE);
createTable(conn, OUTPUT_TABLE);
createTable(conn, OUTPUT_TABLE_2);
+ }
+ }
+
+ static void initData(DbMetadata dbMetadata) throws SQLException {
+ try (Connection conn = DriverManager.getConnection(dbMetadata.getUrl())) {
insertDataIntoInputTable(conn);
}
}
@@ -150,16 +162,23 @@ public class JDBCTestBase {
stat.close();
}
- @AfterClass
- public static void cleanUpDerbyDatabases() throws Exception {
- Class.forName(DRIVER_CLASS);
+ public static void cleanUpDatabasesStatic(DbMetadata dbMetadata) throws ClassNotFoundException, SQLException {
+ Class.forName(dbMetadata.getDriverClass());
try (
- Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
- Statement stat = conn.createStatement()) {
+ Connection conn = DriverManager.getConnection(dbMetadata.getUrl());
+ Statement stat = conn.createStatement()) {
stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE);
stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_2);
}
}
+
+ static void cleanupData(String url) throws Exception {
+ try (Connection conn = DriverManager.getConnection(url);
+ Statement st = conn.createStatement()) {
+ st.executeUpdate("delete from " + INPUT_TABLE);
+ }
+ }
+
}