You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/03/10 16:17:06 UTC

[flink] 08/10: [FLINK-15782][connectors/jdbc] refactor JDBC sink tests Changes: 1. extract DbMetadata to allow to use different databases 2. extract test fixture from the base class 3. use inheritance only for @Before/@After behaviour

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 40cab49012e2a54db36da2b44b951a237057bd49
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Tue Jan 14 00:43:30 2020 +0100

    [FLINK-15782][connectors/jdbc] refactor JDBC sink tests
    Changes:
    1. extract DbMetadata to allow to use different databases
    2. extract test fixture from the base class
    3. use inheritance only for @Before/@After behaviour
---
 .../apache/flink/api/java/io/jdbc/DbMetadata.java  |  36 +++++
 .../flink/api/java/io/jdbc/DerbyDbMetadata.java    |  64 ++++++++
 .../flink/api/java/io/jdbc/JDBCDataTestBase.java   |  37 +++++
 .../flink/api/java/io/jdbc/JDBCFullTest.java       |  43 +++---
 .../api/java/io/jdbc/JDBCInputFormatTest.java      |  78 +++++-----
 .../api/java/io/jdbc/JDBCLookupFunctionITCase.java |   8 +-
 .../api/java/io/jdbc/JDBCOutputFormatTest.java     |  61 ++++----
 .../flink/api/java/io/jdbc/JDBCTestBase.java       | 164 +++------------------
 .../flink/api/java/io/jdbc/JDBCTestCheckpoint.java |  35 +++++
 .../java/io/jdbc/JDBCUpsertTableSinkITCase.java    |   7 +-
 .../java/io/jdbc/JdbcTableOutputFormatTest.java    |  15 +-
 .../{JDBCTestBase.java => JdbcTestFixture.java}    | 123 +++++++++-------
 12 files changed, 380 insertions(+), 291 deletions(-)

diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DbMetadata.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DbMetadata.java
new file mode 100644
index 0000000..14731e4
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DbMetadata.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import javax.sql.XADataSource;
+
+import java.io.Serializable;
+
+/**
+ * Describes a database: driver, schema and urls.
+ */
+public interface DbMetadata extends Serializable {
+
+	String getInitUrl();
+
+	String getUrl();
+
+	XADataSource buildXaDataSource();
+
+	String getDriverClass();
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyDbMetadata.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyDbMetadata.java
new file mode 100644
index 0000000..79a62bc
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyDbMetadata.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.derby.jdbc.EmbeddedXADataSource;
+
+import javax.sql.XADataSource;
+
+/**
+ * DerbyDbMetadata.
+ */
+public class DerbyDbMetadata implements DbMetadata {
+	private final String dbName;
+	private final String dbInitUrl;
+	private final String url;
+
+	public DerbyDbMetadata(String schemaName) {
+		dbName = "memory:" + schemaName;
+		url = "jdbc:derby:" + dbName;
+		dbInitUrl = url + ";create=true";
+	}
+
+	public String getDbName() {
+		return dbName;
+	}
+
+	@Override
+	public String getInitUrl() {
+		return dbInitUrl;
+	}
+
+	@Override
+	public XADataSource buildXaDataSource() {
+		EmbeddedXADataSource ds = new EmbeddedXADataSource();
+		ds.setDatabaseName(dbName);
+		return ds;
+	}
+
+	@Override
+	public String getDriverClass() {
+		return "org.apache.derby.jdbc.EmbeddedDriver";
+	}
+
+	@Override
+	public String getUrl() {
+		return url;
+	}
+
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCDataTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCDataTestBase.java
new file mode 100644
index 0000000..8fb1539
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCDataTestBase.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.junit.Before;
+
+import java.sql.SQLException;
+
+/**
+ * Base class for JDBC test using data from {@link JdbcTestFixture}. It uses {@link DerbyDbMetadata} and inserts data before each test.
+ */
+abstract class JDBCDataTestBase extends JDBCTestBase {
+	@Before
+	public void initData() throws SQLException {
+		JdbcTestFixture.initData(getDbMetadata());
+	}
+
+	@Override
+	protected DbMetadata getDbMetadata() {
+		return JdbcTestFixture.DERBY_EBOOKSHOP_DB;
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
index 14dc85a..28ac1a1 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -26,7 +26,9 @@ import org.apache.flink.types.Row;
 
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -35,12 +37,17 @@ import java.sql.ResultSet;
 import java.sql.Statement;
 import java.sql.Types;
 
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.OUTPUT_TABLE;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.ROW_TYPE_INFO;
 import static org.hamcrest.core.StringContains.containsString;
 
 /**
  * Tests using both {@link JDBCInputFormat} and {@link JDBCOutputFormat}.
  */
-public class JDBCFullTest extends JDBCTestBase {
+public class JDBCFullTest extends JDBCDataTestBase {
+
+	@Rule
+	public ExpectedException exception = ExpectedException.none();
 
 	@Test
 	public void testWithoutParallelism() throws Exception {
@@ -58,8 +65,8 @@ public class JDBCFullTest extends JDBCTestBase {
 		exception.expectMessage(containsString("field index: 3, field value: 11.11."));
 
 		JDBCOutputFormat jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-			.setDrivername(JDBCTestBase.DRIVER_CLASS)
-			.setDBUrl(JDBCTestBase.DB_URL)
+			.setDrivername(getDbMetadata().getDriverClass())
+			.setDBUrl(getDbMetadata().getUrl())
 			.setQuery("insert into newbooks (id, title, author, price, qty) values (?,?,?,?,?)")
 			.setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.DOUBLE, Types.INTEGER})
 			.finish();
@@ -73,18 +80,18 @@ public class JDBCFullTest extends JDBCTestBase {
 	private void runTest(boolean exploitParallelism) throws Exception {
 		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
 		JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(JDBCTestBase.DRIVER_CLASS)
-				.setDBUrl(JDBCTestBase.DB_URL)
-				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS)
+				.setDrivername(getDbMetadata().getDriverClass())
+				.setDBUrl(getDbMetadata().getUrl())
+				.setQuery(JdbcTestFixture.SELECT_ALL_BOOKS)
 				.setRowTypeInfo(ROW_TYPE_INFO);
 
 		if (exploitParallelism) {
 			final int fetchSize = 1;
-			final long min = JDBCTestBase.TEST_DATA[0].id;
-			final long max = JDBCTestBase.TEST_DATA[JDBCTestBase.TEST_DATA.length - fetchSize].id;
+			final long min = JdbcTestFixture.TEST_DATA[0].id;
+			final long max = JdbcTestFixture.TEST_DATA[JdbcTestFixture.TEST_DATA.length - fetchSize].id;
 			//use a "splittable" query to exploit parallelism
 			inputBuilder = inputBuilder
-					.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
+					.setQuery(JdbcTestFixture.SELECT_ALL_BOOKS_SPLIT_BY_ID)
 					.setParametersProvider(new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize));
 		}
 		DataSet<Row> source = environment.createInput(inputBuilder.finish());
@@ -93,8 +100,8 @@ public class JDBCFullTest extends JDBCTestBase {
 		//some databases don't null values correctly when no column type was specified
 		//in PreparedStatement.setObject (see its javadoc for more details)
 		source.output(JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername(JDBCTestBase.DRIVER_CLASS)
-				.setDBUrl(JDBCTestBase.DB_URL)
+				.setDrivername(getDbMetadata().getDriverClass())
+				.setDBUrl(getDbMetadata().getUrl())
 				.setQuery("insert into newbooks (id, title, author, price, qty) values (?,?,?,?,?)")
 				.setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.DOUBLE, Types.INTEGER})
 				.finish());
@@ -102,24 +109,24 @@ public class JDBCFullTest extends JDBCTestBase {
 		environment.execute();
 
 		try (
-			Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
-			PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
-			ResultSet resultSet = statement.executeQuery()
+				Connection dbConn = DriverManager.getConnection(getDbMetadata().getUrl());
+				PreparedStatement statement = dbConn.prepareStatement(JdbcTestFixture.SELECT_ALL_NEWBOOKS);
+				ResultSet resultSet = statement.executeQuery()
 		) {
 			int count = 0;
 			while (resultSet.next()) {
 				count++;
 			}
-			Assert.assertEquals(JDBCTestBase.TEST_DATA.length, count);
+			Assert.assertEquals(JdbcTestFixture.TEST_DATA.length, count);
 		}
 	}
 
 	@After
 	public void clearOutputTable() throws Exception {
-		Class.forName(DRIVER_CLASS);
+		Class.forName(getDbMetadata().getDriverClass());
 		try (
-			Connection conn = DriverManager.getConnection(DB_URL);
-			Statement stat = conn.createStatement()) {
+				Connection conn = DriverManager.getConnection(getDbMetadata().getUrl());
+				Statement stat = conn.createStatement()) {
 			stat.execute("DELETE FROM " + OUTPUT_TABLE);
 
 			stat.close();
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index 559976d..ca4f8e3 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.io.jdbc;
 
+import org.apache.flink.api.java.io.jdbc.JdbcTestFixture.TestEntry;
 import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
 import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
 import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
@@ -34,10 +35,15 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.ROW_TYPE_INFO;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.SELECT_ALL_BOOKS;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.SELECT_EMPTY;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.TEST_DATA;
+
 /**
  * Tests for the {@link JDBCInputFormat}.
  */
-public class JDBCInputFormatTest extends JDBCTestBase {
+public class JDBCInputFormatTest extends JDBCDataTestBase {
 
 	private JDBCInputFormat jdbcInputFormat;
 
@@ -54,7 +60,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	public void testUntypedRowInfo() throws IOException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
 				.setDrivername("org.apache.derby.jdbc.idontexist")
-				.setDBUrl(DB_URL)
+				.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 				.setQuery(SELECT_ALL_BOOKS)
 				.finish();
 		jdbcInputFormat.openInputFormat();
@@ -64,7 +70,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	public void testInvalidDriver() throws IOException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
 				.setDrivername("org.apache.derby.jdbc.idontexist")
-				.setDBUrl(DB_URL)
+				.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 				.setQuery(SELECT_ALL_BOOKS)
 				.setRowTypeInfo(ROW_TYPE_INFO)
 				.finish();
@@ -74,7 +80,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	@Test(expected = IllegalArgumentException.class)
 	public void testInvalidURL() throws IOException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
+				.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
 				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
 				.setQuery(SELECT_ALL_BOOKS)
 				.setRowTypeInfo(ROW_TYPE_INFO)
@@ -85,8 +91,8 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	@Test(expected = IllegalArgumentException.class)
 	public void testInvalidQuery() throws IOException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
+				.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+				.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 				.setQuery("iamnotsql")
 				.setRowTypeInfo(ROW_TYPE_INFO)
 				.finish();
@@ -96,7 +102,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	@Test(expected = IllegalArgumentException.class)
 	public void testIncompleteConfiguration() throws IOException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
+				.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
 				.setQuery(SELECT_ALL_BOOKS)
 				.setRowTypeInfo(ROW_TYPE_INFO)
 				.finish();
@@ -105,8 +111,8 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	@Test(expected = IllegalArgumentException.class)
 	public void testInvalidFetchSize() {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-			.setDrivername(DRIVER_CLASS)
-			.setDBUrl(DB_URL)
+			.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+			.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 			.setQuery(SELECT_ALL_BOOKS)
 			.setRowTypeInfo(ROW_TYPE_INFO)
 			.setFetchSize(-7)
@@ -116,8 +122,8 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	@Test
 	public void testValidFetchSizeIntegerMin() {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-			.setDrivername(DRIVER_CLASS)
-			.setDBUrl(DB_URL)
+			.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+			.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 			.setQuery(SELECT_ALL_BOOKS)
 			.setRowTypeInfo(ROW_TYPE_INFO)
 			.setFetchSize(Integer.MIN_VALUE)
@@ -127,15 +133,15 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	@Test
 	public void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise() throws SQLException, ClassNotFoundException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-			.setDrivername(DRIVER_CLASS)
-			.setDBUrl(DB_URL)
+			.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+			.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 			.setQuery(SELECT_ALL_BOOKS)
 			.setRowTypeInfo(ROW_TYPE_INFO)
 			.finish();
 		jdbcInputFormat.openInputFormat();
 
-		Class.forName(DRIVER_CLASS);
-		final int defaultFetchSize = DriverManager.getConnection(DB_URL).createStatement().getFetchSize();
+		Class.forName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass());
+		final int defaultFetchSize = DriverManager.getConnection(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl()).createStatement().getFetchSize();
 
 		Assert.assertEquals(defaultFetchSize, jdbcInputFormat.getStatement().getFetchSize());
 	}
@@ -144,8 +150,8 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	public void testFetchSizeCanBeConfigured() throws SQLException {
 		final int desiredFetchSize = 10_000;
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-			.setDrivername(DRIVER_CLASS)
-			.setDBUrl(DB_URL)
+			.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+			.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 			.setQuery(SELECT_ALL_BOOKS)
 			.setRowTypeInfo(ROW_TYPE_INFO)
 			.setFetchSize(desiredFetchSize)
@@ -158,15 +164,15 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	public void testDefaultAutoCommitIsUsedIfNotConfiguredOtherwise() throws SQLException, ClassNotFoundException {
 
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-			.setDrivername(DRIVER_CLASS)
-			.setDBUrl(DB_URL)
+			.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+			.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 			.setQuery(SELECT_ALL_BOOKS)
 			.setRowTypeInfo(ROW_TYPE_INFO)
 			.finish();
 		jdbcInputFormat.openInputFormat();
 
-		Class.forName(DRIVER_CLASS);
-		final boolean defaultAutoCommit = DriverManager.getConnection(DB_URL).getAutoCommit();
+		Class.forName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass());
+		final boolean defaultAutoCommit = DriverManager.getConnection(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl()).getAutoCommit();
 
 		Assert.assertEquals(defaultAutoCommit, jdbcInputFormat.getDbConn().getAutoCommit());
 
@@ -177,8 +183,8 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 
 		final boolean desiredAutoCommit = false;
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-			.setDrivername(DRIVER_CLASS)
-			.setDBUrl(DB_URL)
+			.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+			.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 			.setQuery(SELECT_ALL_BOOKS)
 			.setRowTypeInfo(ROW_TYPE_INFO)
 			.setAutoCommit(desiredAutoCommit)
@@ -192,8 +198,8 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	@Test
 	public void testJDBCInputFormatWithoutParallelism() throws IOException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
+				.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+				.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 				.setQuery(SELECT_ALL_BOOKS)
 				.setRowTypeInfo(ROW_TYPE_INFO)
 				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
@@ -223,9 +229,9 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 		final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
 		ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
-				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
+				.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+				.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
+				.setQuery(JdbcTestFixture.SELECT_ALL_BOOKS_SPLIT_BY_ID)
 				.setRowTypeInfo(ROW_TYPE_INFO)
 				.setParametersProvider(pramProvider)
 				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
@@ -259,9 +265,9 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 		final long fetchSize = max + 1; //generate a single split
 		ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
-				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
+				.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+				.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
+				.setQuery(JdbcTestFixture.SELECT_ALL_BOOKS_SPLIT_BY_ID)
 				.setRowTypeInfo(ROW_TYPE_INFO)
 				.setParametersProvider(pramProvider)
 				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
@@ -295,9 +301,9 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 		queryParameters[1] = new String[]{TEST_DATA[0].author};
 		ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
-				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
+				.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+				.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
+				.setQuery(JdbcTestFixture.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
 				.setRowTypeInfo(ROW_TYPE_INFO)
 				.setParametersProvider(paramProvider)
 				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
@@ -335,8 +341,8 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	@Test
 	public void testEmptyResults() throws IOException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
+				.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+				.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 				.setQuery(SELECT_EMPTY)
 				.setRowTypeInfo(ROW_TYPE_INFO)
 				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java
index 16a0eeb..dabd88c 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java
@@ -45,8 +45,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
-import static org.apache.flink.api.java.io.jdbc.JDBCTestBase.DRIVER_CLASS;
-
 /**
  * IT case for {@link JDBCLookupFunction}.
  */
@@ -69,9 +67,9 @@ public class JDBCLookupFunctionITCase extends AbstractTestBase {
 
 	@Before
 	public void before() throws ClassNotFoundException, SQLException {
-		System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+		System.setProperty("derby.stream.error.field", JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL");
 
-		Class.forName(DRIVER_CLASS);
+		Class.forName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass());
 		try (
 			Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
 			Statement stat = conn.createStatement()) {
@@ -123,7 +121,7 @@ public class JDBCLookupFunctionITCase extends AbstractTestBase {
 
 	@After
 	public void clearOutputTable() throws Exception {
-		Class.forName(DRIVER_CLASS);
+		Class.forName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass());
 		try (
 				Connection conn = DriverManager.getConnection(DB_URL);
 				Statement stat = conn.createStatement()) {
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index 104fb87..cc9a92a 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -32,13 +32,18 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.INPUT_TABLE;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.INSERT_TEMPLATE;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.OUTPUT_TABLE;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.OUTPUT_TABLE_2;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.TEST_DATA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
 /**
  * Tests for the {@link JDBCOutputFormat}.
  */
-public class JDBCOutputFormatTest extends JDBCTestBase {
+public class JDBCOutputFormatTest extends JDBCDataTestBase {
 
 	private JDBCOutputFormat jdbcOutputFormat;
 
@@ -54,7 +59,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 	public void testInvalidDriver() throws IOException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
 				.setDrivername("org.apache.derby.jdbc.idontexist")
-				.setDBUrl(DB_URL)
+				.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
 				.finish();
 		jdbcOutputFormat.open(0, 1);
@@ -63,7 +68,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 	@Test(expected = IOException.class)
 	public void testInvalidURL() throws IOException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername(DRIVER_CLASS)
+				.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
 				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
 				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
 				.finish();
@@ -73,8 +78,8 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 	@Test(expected = IOException.class)
 	public void testInvalidQuery() throws IOException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
+				.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+				.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 				.setQuery("iamnotsql")
 				.finish();
 		jdbcOutputFormat.open(0, 1);
@@ -83,7 +88,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 	@Test(expected = NullPointerException.class)
 	public void testIncompleteConfiguration() {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername(DRIVER_CLASS)
+				.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
 				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
 				.finish();
 	}
@@ -91,8 +96,8 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 	@Test(expected = RuntimeException.class)
 	public void testIncompatibleTypes() throws IOException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
+				.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+				.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
 				.finish();
 		jdbcOutputFormat.open(0, 1);
@@ -111,8 +116,8 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 	@Test(expected = RuntimeException.class)
 	public void testExceptionOnInvalidType() throws IOException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-			.setDrivername(DRIVER_CLASS)
-			.setDBUrl(DB_URL)
+			.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+			.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 			.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
 			.setSqlTypes(new int[] {
 				Types.INTEGER,
@@ -123,7 +128,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 			.finish();
 		jdbcOutputFormat.open(0, 1);
 
-		JDBCTestBase.TestEntry entry = TEST_DATA[0];
+		JdbcTestFixture.TestEntry entry = TEST_DATA[0];
 		Row row = new Row(5);
 		row.setField(0, entry.id);
 		row.setField(1, entry.title);
@@ -137,8 +142,8 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 	public void testExceptionOnClose() throws IOException {
 
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-			.setDrivername(DRIVER_CLASS)
-			.setDBUrl(DB_URL)
+			.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+			.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 			.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
 			.setSqlTypes(new int[] {
 				Types.INTEGER,
@@ -149,7 +154,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 			.finish();
 		jdbcOutputFormat.open(0, 1);
 
-		JDBCTestBase.TestEntry entry = TEST_DATA[0];
+		JdbcTestFixture.TestEntry entry = TEST_DATA[0];
 		Row row = new Row(5);
 		row.setField(0, entry.id);
 		row.setField(1, entry.title);
@@ -165,22 +170,22 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 	@Test
 	public void testJDBCOutputFormat() throws IOException, SQLException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername(DRIVER_CLASS)
-				.setDBUrl(DB_URL)
+				.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+				.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 				.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
 				.finish();
 		jdbcOutputFormat.open(0, 1);
 
-		for (JDBCTestBase.TestEntry entry : TEST_DATA) {
+		for (JdbcTestFixture.TestEntry entry : TEST_DATA) {
 			jdbcOutputFormat.writeRecord(toRow(entry));
 		}
 
 		jdbcOutputFormat.close();
 
 		try (
-			Connection dbConn = DriverManager.getConnection(DB_URL);
-			PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
-			ResultSet resultSet = statement.executeQuery()
+				Connection dbConn = DriverManager.getConnection(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl());
+				PreparedStatement statement = dbConn.prepareStatement(JdbcTestFixture.SELECT_ALL_NEWBOOKS);
+				ResultSet resultSet = statement.executeQuery()
 		) {
 			int recordCount = 0;
 			while (resultSet.next()) {
@@ -199,14 +204,14 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 	@Test
 	public void testFlush() throws SQLException, IOException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-			.setDrivername(DRIVER_CLASS)
-			.setDBUrl(DB_URL)
+			.setDrivername(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass())
+			.setDBUrl(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl())
 			.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE_2))
 			.setBatchInterval(3)
 			.finish();
 		try (
-			Connection dbConn = DriverManager.getConnection(DB_URL);
-			PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS_2)
+				Connection dbConn = DriverManager.getConnection(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl());
+				PreparedStatement statement = dbConn.prepareStatement(JdbcTestFixture.SELECT_ALL_NEWBOOKS_2)
 		) {
 			jdbcOutputFormat.open(0, 1);
 			for (int i = 0; i < 2; ++i) {
@@ -235,10 +240,10 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 
 	@After
 	public void clearOutputTable() throws Exception {
-		Class.forName(DRIVER_CLASS);
+		Class.forName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass());
 		try (
-			Connection conn = DriverManager.getConnection(DB_URL);
-			Statement stat = conn.createStatement()) {
+				Connection conn = DriverManager.getConnection(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getUrl());
+				Statement stat = conn.createStatement()) {
 			stat.execute("DELETE FROM " + OUTPUT_TABLE);
 
 			stat.close();
@@ -246,7 +251,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 		}
 	}
 
-	static Row toRow(TestEntry entry) {
+	static Row toRow(JdbcTestFixture.TestEntry entry) {
 		Row row = new Row(5);
 		row.setField(0, entry.id);
 		row.setField(1, entry.title);
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
index 021d28f..ffd25e5 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -1,13 +1,12 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,148 +17,29 @@
 
 package org.apache.flink.api.java.io.jdbc;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.junit.After;
+import org.junit.Before;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.ExpectedException;
-
-import java.io.OutputStream;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.cleanUpDatabasesStatic;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.cleanupData;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.initSchema;
 
 /**
- * Base test class for JDBC Input and Output.
+ * Base class for JDBC test using DDL from {@link JdbcTestFixture}. It uses create tables before each test and drops afterwards.
  */
-public class JDBCTestBase {
-
-	@Rule
-	public ExpectedException exception = ExpectedException.none();
-
-	public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
-	public static final String DB_URL = "jdbc:derby:memory:ebookshop";
-	public static final String INPUT_TABLE = "books";
-	public static final String OUTPUT_TABLE = "newbooks";
-	public static final String OUTPUT_TABLE_2 = "newbooks2";
-	public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
-	public static final String SELECT_ID_BOOKS = "select id from " + INPUT_TABLE;
-	public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
-	public static final String SELECT_ALL_NEWBOOKS_2 = "select * from " + OUTPUT_TABLE_2;
-	public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
-	public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)";
-	public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
-	public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = SELECT_ALL_BOOKS + " WHERE author = ?";
-
-	public static final TestEntry[] TEST_DATA = {
-			new TestEntry(1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11),
-			new TestEntry(1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22),
-			new TestEntry(1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33),
-			new TestEntry(1004, ("A Cup of Java"), ("Kumar"), 44.44, 44),
-			new TestEntry(1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55),
-			new TestEntry(1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 66.66, 66),
-			new TestEntry(1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 77.77, 77),
-			new TestEntry(1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 88.88, 88),
-			new TestEntry(1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99),
-			new TestEntry(1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010)
-	};
-
-	static class TestEntry {
-		protected final Integer id;
-		protected final String title;
-		protected final String author;
-		protected final Double price;
-		protected final Integer qty;
+public abstract class JDBCTestBase {
 
-		private TestEntry(Integer id, String title, String author, Double price, Integer qty) {
-			this.id = id;
-			this.title = title;
-			this.author = author;
-			this.price = price;
-			this.qty = qty;
-		}
+	@Before
+	public final void before() throws Exception {
+		initSchema(getDbMetadata());
 	}
 
-	public static final RowTypeInfo ROW_TYPE_INFO = new RowTypeInfo(
-		BasicTypeInfo.INT_TYPE_INFO,
-		BasicTypeInfo.STRING_TYPE_INFO,
-		BasicTypeInfo.STRING_TYPE_INFO,
-		BasicTypeInfo.DOUBLE_TYPE_INFO,
-		BasicTypeInfo.INT_TYPE_INFO);
-
-	public static String getCreateQuery(String tableName) {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE ");
-		sqlQueryBuilder.append(tableName).append(" (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-		return sqlQueryBuilder.toString();
-	}
-
-	public static String getInsertQuery() {
-		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		for (int i = 0; i < TEST_DATA.length; i++) {
-			sqlQueryBuilder.append("(")
-			.append(TEST_DATA[i].id).append(",'")
-			.append(TEST_DATA[i].title).append("','")
-			.append(TEST_DATA[i].author).append("',")
-			.append(TEST_DATA[i].price).append(",")
-			.append(TEST_DATA[i].qty).append(")");
-			if (i < TEST_DATA.length - 1) {
-				sqlQueryBuilder.append(",");
-			}
-		}
-		String insertQuery = sqlQueryBuilder.toString();
-		return insertQuery;
+	@After
+	public final void after() throws Exception {
+		cleanupData(getDbMetadata().getUrl());
+		cleanUpDatabasesStatic(getDbMetadata());
 	}
 
-	public static final OutputStream DEV_NULL = new OutputStream() {
-		@Override
-		public void write(int b) {
-		}
-	};
-
-	@BeforeClass
-	public static void prepareDerbyDatabase() throws Exception {
-		System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+	protected abstract DbMetadata getDbMetadata();
 
-		Class.forName(DRIVER_CLASS);
-		try (Connection conn = DriverManager.getConnection(DB_URL + ";create=true")) {
-			createTable(conn, JDBCTestBase.INPUT_TABLE);
-			createTable(conn, OUTPUT_TABLE);
-			createTable(conn, OUTPUT_TABLE_2);
-			insertDataIntoInputTable(conn);
-		}
-	}
-
-	private static void createTable(Connection conn, String tableName) throws SQLException {
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(getCreateQuery(tableName));
-		stat.close();
-	}
-
-	private static void insertDataIntoInputTable(Connection conn) throws SQLException {
-		Statement stat = conn.createStatement();
-		stat.execute(getInsertQuery());
-		stat.close();
-	}
-
-	@AfterClass
-	public static void cleanUpDerbyDatabases() throws Exception {
-		Class.forName(DRIVER_CLASS);
-		try (
-			Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
-			Statement stat = conn.createStatement()) {
-
-			stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
-			stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE);
-			stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_2);
-		}
-	}
 }
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestCheckpoint.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestCheckpoint.java
new file mode 100644
index 0000000..6086011
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestCheckpoint.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+/**
+ * Holds id and indices of items in {@link JdbcTestFixture#TEST_DATA}.
+ */
+public class JDBCTestCheckpoint {
+	public final long id;
+	public final int[] dataItemsIdx;
+
+	JDBCTestCheckpoint(long id, int... dataItemsIdx) {
+		this.id = id;
+		this.dataItemsIdx = dataItemsIdx;
+	}
+
+	public JDBCTestCheckpoint withCheckpointId(long id) {
+		return new JDBCTestCheckpoint(id, dataItemsIdx);
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
index 6f4f810..76f60bc 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSinkITCase.java
@@ -41,7 +41,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.flink.api.java.io.jdbc.JDBCTestBase.DRIVER_CLASS;
 import static org.apache.flink.api.java.io.jdbc.JdbcTableOutputFormatTest.check;
 
 /**
@@ -55,9 +54,9 @@ public class JDBCUpsertTableSinkITCase extends AbstractTestBase {
 
 	@Before
 	public void before() throws ClassNotFoundException, SQLException {
-		System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+		System.setProperty("derby.stream.error.field", JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL");
 
-		Class.forName(DRIVER_CLASS);
+		Class.forName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass());
 		try (
 			Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
 			Statement stat = conn.createStatement()) {
@@ -79,7 +78,7 @@ public class JDBCUpsertTableSinkITCase extends AbstractTestBase {
 
 	@After
 	public void clearOutputTable() throws Exception {
-		Class.forName(DRIVER_CLASS);
+		Class.forName(JdbcTestFixture.DERBY_EBOOKSHOP_DB.getDriverClass());
 		try (
 			Connection conn = DriverManager.getConnection(DB_URL);
 			Statement stat = conn.createStatement()) {
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTableOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTableOutputFormatTest.java
index 234657d..aaaa92c 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTableOutputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTableOutputFormatTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.io.jdbc;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.io.jdbc.JdbcTestFixture.TestEntry;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.Row;
 
@@ -39,13 +40,15 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormatTest.toRow;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.OUTPUT_TABLE;
+import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.TEST_DATA;
 import static org.junit.Assert.assertArrayEquals;
 import static org.mockito.Mockito.doReturn;
 
 /**
  * Tests for the {@link JdbcBatchingOutputFormat}.
  */
-public class JdbcTableOutputFormatTest extends JDBCTestBase {
+public class JdbcTableOutputFormatTest extends JDBCDataTestBase {
 
 	private TableJdbcUpsertOutputFormat format;
 	private String[] fieldNames;
@@ -60,7 +63,7 @@ public class JdbcTableOutputFormatTest extends JDBCTestBase {
 	@Test
 	public void testJDBCOutputFormat() throws Exception {
 		JDBCOptions options = JDBCOptions.builder()
-				.setDBUrl(DB_URL)
+				.setDBUrl(getDbMetadata().getUrl())
 				.setTableName(OUTPUT_TABLE)
 				.build();
 		JdbcDmlOptions dmlOptions = JdbcDmlOptions.builder()
@@ -100,7 +103,7 @@ public class JdbcTableOutputFormatTest extends JDBCTestBase {
 	}
 
 	private void check(Row[] rows) throws SQLException {
-		check(rows, DB_URL, OUTPUT_TABLE, fieldNames);
+		check(rows, getDbMetadata().getUrl(), OUTPUT_TABLE, fieldNames);
 	}
 
 	static void check(Row[] rows, String url, String table, String[] fields) throws SQLException {
@@ -131,10 +134,10 @@ public class JdbcTableOutputFormatTest extends JDBCTestBase {
 			format.close();
 		}
 		format = null;
-		Class.forName(DRIVER_CLASS);
+		Class.forName(getDbMetadata().getDriverClass());
 		try (
-			Connection conn = DriverManager.getConnection(DB_URL);
-			Statement stat = conn.createStatement()) {
+				Connection conn = DriverManager.getConnection(getDbMetadata().getUrl());
+				Statement stat = conn.createStatement()) {
 			stat.execute("DELETE FROM " + OUTPUT_TABLE);
 
 			stat.close();
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTestFixture.java
similarity index 55%
copy from flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
copy to flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTestFixture.java
index 021d28f..071a2af 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JdbcTestFixture.java
@@ -21,38 +21,32 @@ package org.apache.flink.api.java.io.jdbc;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.ExpectedException;
-
 import java.io.OutputStream;
+import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 
 /**
- * Base test class for JDBC Input and Output.
+ * Test data and helper objects for JDBC tests.
  */
-public class JDBCTestBase {
-
-	@Rule
-	public ExpectedException exception = ExpectedException.none();
+@SuppressWarnings("SpellCheckingInspection")
+public class JdbcTestFixture {
+	public static final JDBCTestCheckpoint CP0 = new JDBCTestCheckpoint(0, 1, 2, 3);
+	public static final JDBCTestCheckpoint CP1 = new JDBCTestCheckpoint(1, 4, 5, 6);
 
-	public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
-	public static final String DB_URL = "jdbc:derby:memory:ebookshop";
 	public static final String INPUT_TABLE = "books";
-	public static final String OUTPUT_TABLE = "newbooks";
-	public static final String OUTPUT_TABLE_2 = "newbooks2";
-	public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
-	public static final String SELECT_ID_BOOKS = "select id from " + INPUT_TABLE;
-	public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
-	public static final String SELECT_ALL_NEWBOOKS_2 = "select * from " + OUTPUT_TABLE_2;
-	public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
+	static final String OUTPUT_TABLE = "newbooks";
+	static final String OUTPUT_TABLE_2 = "newbooks2";
+	static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
+	static final String SELECT_ID_BOOKS = "select id from " + INPUT_TABLE;
+	static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
+	static final String SELECT_ALL_NEWBOOKS_2 = "select * from " + OUTPUT_TABLE_2;
+	static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
 	public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)";
-	public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
-	public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = SELECT_ALL_BOOKS + " WHERE author = ?";
+	static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
+	static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = SELECT_ALL_BOOKS + " WHERE author = ?";
 
 	public static final TestEntry[] TEST_DATA = {
 			new TestEntry(1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11),
@@ -67,12 +61,18 @@ public class JDBCTestBase {
 			new TestEntry(1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010)
 	};
 
-	static class TestEntry {
-		protected final Integer id;
-		protected final String title;
-		protected final String author;
-		protected final Double price;
-		protected final Integer qty;
+	private static final String EBOOKSHOP_SCHEMA_NAME = "ebookshop";
+	public static final DerbyDbMetadata DERBY_EBOOKSHOP_DB = new DerbyDbMetadata(EBOOKSHOP_SCHEMA_NAME);
+
+	/**
+	 * TestEntry.
+	 */
+	public static class TestEntry implements Serializable {
+		public final Integer id;
+		public final String title;
+		public final String author;
+		public final Double price;
+		public final Integer qty;
 
 		private TestEntry(Integer id, String title, String author, Double price, Integer qty) {
 			this.id = id;
@@ -81,25 +81,34 @@ public class JDBCTestBase {
 			this.price = price;
 			this.qty = qty;
 		}
+
+		@Override
+		public String toString() {
+			return "TestEntry{" +
+					"id=" + id +
+					", title='" + title + '\'' +
+					", author='" + author + '\'' +
+					", price=" + price +
+					", qty=" + qty +
+					'}';
+		}
 	}
 
-	public static final RowTypeInfo ROW_TYPE_INFO = new RowTypeInfo(
+	static final RowTypeInfo ROW_TYPE_INFO = new RowTypeInfo(
 		BasicTypeInfo.INT_TYPE_INFO,
 		BasicTypeInfo.STRING_TYPE_INFO,
 		BasicTypeInfo.STRING_TYPE_INFO,
 		BasicTypeInfo.DOUBLE_TYPE_INFO,
 		BasicTypeInfo.INT_TYPE_INFO);
 
-	public static String getCreateQuery(String tableName) {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE ");
-		sqlQueryBuilder.append(tableName).append(" (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-		return sqlQueryBuilder.toString();
+	private static String getCreateQuery(String tableName) {
+		return "CREATE TABLE " + tableName + " (" +
+				"id INT NOT NULL DEFAULT 0," +
+				"title VARCHAR(50) DEFAULT NULL," +
+				"author VARCHAR(50) DEFAULT NULL," +
+				"price FLOAT DEFAULT NULL," +
+				"qty INT DEFAULT NULL," +
+				"PRIMARY KEY (id))";
 	}
 
 	public static String getInsertQuery() {
@@ -115,25 +124,28 @@ public class JDBCTestBase {
 				sqlQueryBuilder.append(",");
 			}
 		}
-		String insertQuery = sqlQueryBuilder.toString();
-		return insertQuery;
+		return sqlQueryBuilder.toString();
 	}
 
+	@SuppressWarnings("unused") // used in string constant in prepareDatabase
 	public static final OutputStream DEV_NULL = new OutputStream() {
 		@Override
 		public void write(int b) {
 		}
 	};
 
-	@BeforeClass
-	public static void prepareDerbyDatabase() throws Exception {
-		System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
-
-		Class.forName(DRIVER_CLASS);
-		try (Connection conn = DriverManager.getConnection(DB_URL + ";create=true")) {
-			createTable(conn, JDBCTestBase.INPUT_TABLE);
+	public static void initSchema(DbMetadata dbMetadata) throws ClassNotFoundException, SQLException {
+		System.setProperty("derby.stream.error.field", JdbcTestFixture.class.getCanonicalName() + ".DEV_NULL");
+		Class.forName(dbMetadata.getDriverClass());
+		try (Connection conn = DriverManager.getConnection(dbMetadata.getInitUrl())) {
+			createTable(conn, JdbcTestFixture.INPUT_TABLE);
 			createTable(conn, OUTPUT_TABLE);
 			createTable(conn, OUTPUT_TABLE_2);
+		}
+	}
+
+	static void initData(DbMetadata dbMetadata) throws SQLException {
+		try (Connection conn = DriverManager.getConnection(dbMetadata.getUrl())) {
 			insertDataIntoInputTable(conn);
 		}
 	}
@@ -150,16 +162,23 @@ public class JDBCTestBase {
 		stat.close();
 	}
 
-	@AfterClass
-	public static void cleanUpDerbyDatabases() throws Exception {
-		Class.forName(DRIVER_CLASS);
+	public static void cleanUpDatabasesStatic(DbMetadata dbMetadata) throws ClassNotFoundException, SQLException {
+		Class.forName(dbMetadata.getDriverClass());
 		try (
-			Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
-			Statement stat = conn.createStatement()) {
+				Connection conn = DriverManager.getConnection(dbMetadata.getUrl());
+				Statement stat = conn.createStatement()) {
 
 			stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
 			stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE);
 			stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_2);
 		}
 	}
+
+	static void cleanupData(String url) throws Exception {
+		try (Connection conn = DriverManager.getConnection(url);
+			Statement st = conn.createStatement()) {
+			st.executeUpdate("delete from " + INPUT_TABLE);
+		}
+	}
+
 }