You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/11 16:11:43 UTC

[flink] branch master updated: [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source (#9029)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a91d951  [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source (#9029)
a91d951 is described below

commit a91d9515dc3e2e694626a5fe2625a9d7402f67ef
Author: TsReaper <ts...@gmail.com>
AuthorDate: Fri Jul 12 00:11:25 2019 +0800

    [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source (#9029)
---
 .../flink/api/java/io/jdbc/JDBCLookupFunction.java |   2 +-
 .../flink/api/java/io/jdbc/JDBCLookupOptions.java  |  15 +-
 .../apache/flink/api/java/io/jdbc/JDBCOptions.java |  18 +-
 .../flink/api/java/io/jdbc/JDBCReadOptions.java    | 146 +++++++++++
 .../flink/api/java/io/jdbc/JDBCTableSource.java    | 135 +++++++++-
 .../java/io/jdbc/JDBCTableSourceSinkFactory.java   | 192 ++++++++++++++
 .../api/java/io/jdbc/JDBCUpsertTableSink.java      |  17 ++
 .../api/java/io/jdbc/dialect/JDBCDialect.java      |   2 +-
 .../split/NumericBetweenParametersProvider.java    |  75 ++++--
 .../flink/table/descriptors/JDBCValidator.java     | 133 ++++++++++
 .../org.apache.flink.table.factories.TableFactory  |  16 ++
 .../flink/api/java/io/jdbc/JDBCFullTest.java       |   2 +-
 .../api/java/io/jdbc/JDBCInputFormatTest.java      |   4 +-
 .../io/jdbc/JDBCTableSourceSinkFactoryTest.java    | 276 +++++++++++++++++++++
 .../NumericBetweenParametersProviderTest.java      | 122 +++++++++
 15 files changed, 1122 insertions(+), 33 deletions(-)

diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
index 4a46441..80d6127 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
@@ -114,7 +114,7 @@ public class JDBCLookupFunction extends TableFunction<Row> {
 		try {
 			establishConnection();
 			statement = dbConn.prepareStatement(query);
-			this.cache = cacheMaxSize == -1 ? null : CacheBuilder.newBuilder()
+			this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
 					.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
 					.maximumSize(cacheMaxSize)
 					.build();
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java
index dd6dbbe..6137324 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.io.jdbc;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 import static org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.DEFAULT_MAX_RETRY_TIMES;
 
@@ -53,8 +54,20 @@ public class JDBCLookupOptions implements Serializable {
 		return new Builder();
 	}
 
+	@Override
+	public boolean equals(Object o) {
+		if (o instanceof JDBCLookupOptions) {
+			JDBCLookupOptions options = (JDBCLookupOptions) o;
+			return Objects.equals(cacheMaxSize, options.cacheMaxSize) &&
+				Objects.equals(cacheExpireMs, options.cacheExpireMs) &&
+				Objects.equals(maxRetryTimes, options.maxRetryTimes);
+		} else {
+			return false;
+		}
+	}
+
 	/**
-	 * Builder of {@link JDBCOptions}.
+	 * Builder of {@link JDBCLookupOptions}.
 	 */
 	public static class Builder {
 		private long cacheMaxSize = -1L;
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java
index e026a32..f68b14a 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java
@@ -21,12 +21,13 @@ package org.apache.flink.api.java.io.jdbc;
 import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
 import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
 
+import java.util.Objects;
 import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Options for the JDBC connector.
+ * Common options of {@link JDBCScanOptions} and {@link JDBCLookupOptions} for the JDBC connector.
  */
 public class JDBCOptions {
 
@@ -75,6 +76,21 @@ public class JDBCOptions {
 		return new Builder();
 	}
 
+	@Override
+	public boolean equals(Object o) {
+		if (o instanceof JDBCOptions) {
+			JDBCOptions options = (JDBCOptions) o;
+			return Objects.equals(dbURL, options.dbURL) &&
+				Objects.equals(tableName, options.tableName) &&
+				Objects.equals(driverName, options.driverName) &&
+				Objects.equals(username, options.username) &&
+				Objects.equals(password, options.password) &&
+				Objects.equals(dialect.getClass().getName(), options.dialect.getClass().getName());
+		} else {
+			return false;
+		}
+	}
+
 	/**
 	 * Builder of {@link JDBCOptions}.
 	 */
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCReadOptions.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCReadOptions.java
new file mode 100644
index 0000000..4591e10
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCReadOptions.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Options for the JDBC scan.
+ */
+public class JDBCReadOptions implements Serializable {
+
+	private final String partitionColumnName;
+	private final Long partitionLowerBound;
+	private final Long partitionUpperBound;
+	private final Integer numPartitions;
+
+	private final int fetchSize;
+
+	private JDBCReadOptions(
+			String partitionColumnName,
+			Long partitionLowerBound,
+			Long partitionUpperBound,
+			Integer numPartitions,
+			int fetchSize) {
+		this.partitionColumnName = partitionColumnName;
+		this.partitionLowerBound = partitionLowerBound;
+		this.partitionUpperBound = partitionUpperBound;
+		this.numPartitions = numPartitions;
+
+		this.fetchSize = fetchSize;
+	}
+
+	public Optional<String> getPartitionColumnName() {
+		return Optional.ofNullable(partitionColumnName);
+	}
+
+	public Optional<Long> getPartitionLowerBound() {
+		return Optional.ofNullable(partitionLowerBound);
+	}
+
+	public Optional<Long> getPartitionUpperBound() {
+		return Optional.ofNullable(partitionUpperBound);
+	}
+
+	public Optional<Integer> getNumPartitions() {
+		return Optional.ofNullable(numPartitions);
+	}
+
+	public int getFetchSize() {
+		return fetchSize;
+	}
+
+	public static Builder builder() {
+		return new Builder();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (o instanceof JDBCReadOptions) {
+			JDBCReadOptions options = (JDBCReadOptions) o;
+			return Objects.equals(partitionColumnName, options.partitionColumnName) &&
+				Objects.equals(partitionLowerBound, options.partitionLowerBound) &&
+				Objects.equals(partitionUpperBound, options.partitionUpperBound) &&
+				Objects.equals(numPartitions, options.numPartitions) &&
+				Objects.equals(fetchSize, options.fetchSize);
+		} else {
+			return false;
+		}
+	}
+
+	/**
+	 * Builder of {@link JDBCReadOptions}.
+	 */
+	public static class Builder {
+		private String partitionColumnName;
+		private Long partitionLowerBound;
+		private Long partitionUpperBound;
+		private Integer numPartitions;
+
+		private int fetchSize = 0;
+
+		/**
+		 * optional, name of the column used for partitioning the input.
+		 */
+		public Builder setPartitionColumnName(String partitionColumnName) {
+			this.partitionColumnName = partitionColumnName;
+			return this;
+		}
+
+		/**
+		 * optional, the smallest value of the first partition.
+		 */
+		public Builder setPartitionLowerBound(long partitionLowerBound) {
+			this.partitionLowerBound = partitionLowerBound;
+			return this;
+		}
+
+		/**
+		 * optional, the largest value of the last partition.
+		 */
+		public Builder setPartitionUpperBound(long partitionUpperBound) {
+			this.partitionUpperBound = partitionUpperBound;
+			return this;
+		}
+
+		/**
+		 * optional, the maximum number of partitions that can be used for parallelism in table reading.
+		 */
+		public Builder setNumPartitions(int numPartitions) {
+			this.numPartitions = numPartitions;
+			return this;
+		}
+
+		/**
+		 * optional, the number of rows to fetch per round trip.
+		 * default value is 0, according to the jdbc api, 0 means that fetchSize hint will be ignored.
+		 */
+		public Builder setFetchSize(int fetchSize) {
+			this.fetchSize = fetchSize;
+			return this;
+		}
+
+		public JDBCReadOptions build() {
+			return new JDBCReadOptions(
+				partitionColumnName, partitionLowerBound, partitionUpperBound, numPartitions, fetchSize);
+		}
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
index 82c3c33..56298c7 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
@@ -18,30 +18,81 @@
 
 package org.apache.flink.api.java.io.jdbc;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.functions.AsyncTableFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.types.Row;
 
+import java.util.Arrays;
+import java.util.Objects;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * {@link TableSource} for JDBC.
- * Now only support {@link LookupableTableSource}.
  */
-public class JDBCTableSource implements LookupableTableSource<Row> {
+public class JDBCTableSource implements
+		StreamTableSource<Row>,
+		ProjectableTableSource<Row>,
+		LookupableTableSource<Row> {
 
 	private final JDBCOptions options;
+	private final JDBCReadOptions readOptions;
 	private final JDBCLookupOptions lookupOptions;
 	private final TableSchema schema;
 
-	public JDBCTableSource(
-			JDBCOptions options, JDBCLookupOptions lookupOptions, TableSchema schema) {
+	// index of fields selected, null means that all fields are selected
+	private final int[] selectFields;
+	private final RowTypeInfo returnType;
+
+	private JDBCTableSource(
+		JDBCOptions options, JDBCReadOptions readOptions, JDBCLookupOptions lookupOptions, TableSchema schema) {
+		this(options, readOptions, lookupOptions, schema, null);
+	}
+
+	private JDBCTableSource(
+		JDBCOptions options, JDBCReadOptions readOptions, JDBCLookupOptions lookupOptions,
+		TableSchema schema, int[] selectFields) {
 		this.options = options;
+		this.readOptions = readOptions;
 		this.lookupOptions = lookupOptions;
 		this.schema = schema;
+
+		this.selectFields = selectFields;
+
+		final TypeInformation<?>[] schemaTypeInfos = schema.getFieldTypes();
+		final String[] schemaFieldNames = schema.getFieldNames();
+		if (selectFields != null) {
+			TypeInformation<?>[] typeInfos = new TypeInformation[selectFields.length];
+			String[] typeNames = new String[selectFields.length];
+			for (int i = 0; i < selectFields.length; i++) {
+				typeInfos[i] = schemaTypeInfos[selectFields[i]];
+				typeNames[i] = schemaFieldNames[selectFields[i]];
+			}
+			this.returnType = new RowTypeInfo(typeInfos, typeNames);
+		} else {
+			this.returnType = new RowTypeInfo(schemaTypeInfos, schemaFieldNames);
+		}
+	}
+
+	@Override
+	public boolean isBounded() {
+		return true;
+	}
+
+	@Override
+	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
+		return execEnv.createInput(getInputFormat(), getReturnType()).name(explainSource());
 	}
 
 	@Override
@@ -49,13 +100,23 @@ public class JDBCTableSource implements LookupableTableSource<Row> {
 		return JDBCLookupFunction.builder()
 				.setOptions(options)
 				.setLookupOptions(lookupOptions)
-				.setFieldTypes(schema.getFieldTypes())
-				.setFieldNames(schema.getFieldNames())
+				.setFieldTypes(returnType.getFieldTypes())
+				.setFieldNames(returnType.getFieldNames())
 				.setKeyNames(lookupKeys)
 				.build();
 	}
 
 	@Override
+	public TypeInformation<Row> getReturnType() {
+		return returnType;
+	}
+
+	@Override
+	public TableSource<Row> projectFields(int[] fields) {
+		return new JDBCTableSource(options, readOptions, lookupOptions, schema, fields);
+	}
+
+	@Override
 	public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys) {
 		throw new UnsupportedOperationException();
 	}
@@ -74,12 +135,57 @@ public class JDBCTableSource implements LookupableTableSource<Row> {
 		return new Builder();
 	}
 
+	private JDBCInputFormat getInputFormat() {
+		JDBCInputFormat.JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(options.getDriverName())
+				.setDBUrl(options.getDbURL())
+				.setUsername(options.getUsername())
+				.setPassword(options.getPassword())
+				.setRowTypeInfo(new RowTypeInfo(returnType.getFieldTypes(), returnType.getFieldNames()));
+
+		if (readOptions.getFetchSize() != 0) {
+			builder.setFetchSize(readOptions.getFetchSize());
+		}
+
+		final JDBCDialect dialect = options.getDialect();
+		String query = dialect.getSelectFromStatement(
+			options.getTableName(), returnType.getFieldNames(), new String[0]);
+		if (readOptions.getPartitionColumnName().isPresent()) {
+			long lowerBound = readOptions.getPartitionLowerBound().get();
+			long upperBound = readOptions.getPartitionUpperBound().get();
+			int numPartitions = readOptions.getNumPartitions().get();
+			builder.setParametersProvider(
+				new NumericBetweenParametersProvider(lowerBound, upperBound).ofBatchNum(numPartitions));
+			query += " WHERE " +
+				dialect.quoteIdentifier(readOptions.getPartitionColumnName().get()) +
+				" BETWEEN ? AND ?";
+		}
+		builder.setQuery(query);
+
+		return builder.finish();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (o instanceof JDBCTableSource) {
+			JDBCTableSource source = (JDBCTableSource) o;
+			return Objects.equals(options, source.options) &&
+				Objects.equals(readOptions, source.readOptions) &&
+				Objects.equals(lookupOptions, source.lookupOptions) &&
+				Objects.equals(schema, source.schema) &&
+				Arrays.equals(selectFields, source.selectFields);
+		} else {
+			return false;
+		}
+	}
+
 	/**
 	 * Builder for a {@link JDBCTableSource}.
 	 */
 	public static class Builder {
 
 		private JDBCOptions options;
+		private JDBCReadOptions readOptions;
 		private JDBCLookupOptions lookupOptions;
 		private TableSchema schema;
 
@@ -92,6 +198,15 @@ public class JDBCTableSource implements LookupableTableSource<Row> {
 		}
 
 		/**
+		 * optional, scan related options.
+		 * {@link JDBCReadOptions} will be only used for {@link StreamTableSource}.
+		 */
+		public Builder setReadOptions(JDBCReadOptions readOptions) {
+			this.readOptions = readOptions;
+			return this;
+		}
+
+		/**
 		 * optional, lookup related options.
 		 * {@link JDBCLookupOptions} only be used for {@link LookupableTableSource}.
 		 */
@@ -116,7 +231,13 @@ public class JDBCTableSource implements LookupableTableSource<Row> {
 		public JDBCTableSource build() {
 			checkNotNull(options, "No options supplied.");
 			checkNotNull(schema, "No schema supplied.");
-			return new JDBCTableSource(options, lookupOptions, schema);
+			if (readOptions == null) {
+				readOptions = JDBCReadOptions.builder().build();
+			}
+			if (lookupOptions == null) {
+				lookupOptions = JDBCLookupOptions.builder().build();
+			}
+			return new JDBCTableSource(options, readOptions, lookupOptions, schema);
 		}
 	}
 }
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
new file mode 100644
index 0000000..b90871c
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.JDBCValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_DRIVER;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_LOOKUP_CACHE_MAX_ROWS;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_LOOKUP_CACHE_TTL;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_LOOKUP_MAX_RETRIES;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_PASSWORD;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_FETCH_SIZE;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_PARTITION_COLUMN;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_PARTITION_LOWER_BOUND;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_PARTITION_NUM;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_PARTITION_UPPER_BOUND;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_TABLE;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_TYPE_VALUE_JDBC;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_URL;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_USERNAME;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_WRITE_FLUSH_INTERVAL;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_WRITE_FLUSH_MAX_ROWS;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_WRITE_MAX_RETRIES;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+	StreamTableSourceFactory<Row>,
+	StreamTableSinkFactory<Tuple2<Boolean, Row>> {
+
+	@Override
+	public Map<String, String> requiredContext() {
+		Map<String, String> context = new HashMap<>();
+		context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_JDBC); // jdbc
+		context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility
+		return context;
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		List<String> properties = new ArrayList<>();
+
+		// common options
+		properties.add(CONNECTOR_DRIVER);
+		properties.add(CONNECTOR_URL);
+		properties.add(CONNECTOR_TABLE);
+		properties.add(CONNECTOR_USERNAME);
+		properties.add(CONNECTOR_PASSWORD);
+
+		// scan options
+		properties.add(CONNECTOR_READ_PARTITION_COLUMN);
+		properties.add(CONNECTOR_READ_PARTITION_NUM);
+		properties.add(CONNECTOR_READ_PARTITION_LOWER_BOUND);
+		properties.add(CONNECTOR_READ_PARTITION_UPPER_BOUND);
+		properties.add(CONNECTOR_READ_FETCH_SIZE);
+
+		// lookup options
+		properties.add(CONNECTOR_LOOKUP_CACHE_MAX_ROWS);
+		properties.add(CONNECTOR_LOOKUP_CACHE_TTL);
+		properties.add(CONNECTOR_LOOKUP_MAX_RETRIES);
+
+		// sink options
+		properties.add(CONNECTOR_WRITE_FLUSH_MAX_ROWS);
+		properties.add(CONNECTOR_WRITE_FLUSH_INTERVAL);
+		properties.add(CONNECTOR_WRITE_MAX_RETRIES);
+
+		// schema
+		properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
+		properties.add(SCHEMA + ".#." + SCHEMA_NAME);
+
+		return properties;
+	}
+
+	@Override
+	public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
+		final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+
+		return JDBCTableSource.builder()
+			.setOptions(getJDBCOptions(descriptorProperties))
+			.setReadOptions(getJDBCReadOptions(descriptorProperties))
+			.setLookupOptions(getJDBCLookupOptions(descriptorProperties))
+			.setSchema(descriptorProperties.getTableSchema(SCHEMA))
+			.build();
+	}
+
+	@Override
+	public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties) {
+		final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+
+		final JDBCUpsertTableSink.Builder builder = JDBCUpsertTableSink.builder()
+			.setOptions(getJDBCOptions(descriptorProperties))
+			.setTableSchema(descriptorProperties.getTableSchema(SCHEMA));
+
+		descriptorProperties.getOptionalInt(CONNECTOR_WRITE_FLUSH_MAX_ROWS).ifPresent(builder::setFlushMaxSize);
+		descriptorProperties.getOptionalDuration(CONNECTOR_WRITE_FLUSH_INTERVAL).ifPresent(
+			s -> builder.setFlushIntervalMills(s.toMillis()));
+		descriptorProperties.getOptionalInt(CONNECTOR_WRITE_MAX_RETRIES).ifPresent(builder::setMaxRetryTimes);
+
+		return builder.build();
+	}
+
+	private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
+		final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+		descriptorProperties.putProperties(properties);
+
+		new SchemaValidator(true, false, false).validate(descriptorProperties);
+		new JDBCValidator().validate(descriptorProperties);
+
+		return descriptorProperties;
+	}
+
+	private JDBCOptions getJDBCOptions(DescriptorProperties descriptorProperties) {
+		final String url = descriptorProperties.getString(CONNECTOR_URL);
+		final JDBCOptions.Builder builder = JDBCOptions.builder()
+			.setDBUrl(url)
+			.setTableName(descriptorProperties.getString(CONNECTOR_TABLE))
+			.setDialect(JDBCDialects.get(url).get());
+
+		descriptorProperties.getOptionalString(CONNECTOR_DRIVER).ifPresent(builder::setDriverName);
+		descriptorProperties.getOptionalString(CONNECTOR_USERNAME).ifPresent(builder::setUsername);
+		descriptorProperties.getOptionalString(CONNECTOR_PASSWORD).ifPresent(builder::setPassword);
+
+		return builder.build();
+	}
+
+	private JDBCReadOptions getJDBCReadOptions(DescriptorProperties descriptorProperties) {
+		final Optional<String> partitionColumnName =
+			descriptorProperties.getOptionalString(CONNECTOR_READ_PARTITION_COLUMN);
+		final Optional<Long> partitionLower = descriptorProperties.getOptionalLong(CONNECTOR_READ_PARTITION_LOWER_BOUND);
+		final Optional<Long> partitionUpper = descriptorProperties.getOptionalLong(CONNECTOR_READ_PARTITION_UPPER_BOUND);
+		final Optional<Integer> numPartitions = descriptorProperties.getOptionalInt(CONNECTOR_READ_PARTITION_NUM);
+
+		final JDBCReadOptions.Builder builder = JDBCReadOptions.builder();
+		if (partitionColumnName.isPresent()) {
+			builder.setPartitionColumnName(partitionColumnName.get());
+			builder.setPartitionLowerBound(partitionLower.get());
+			builder.setPartitionUpperBound(partitionUpper.get());
+			builder.setNumPartitions(numPartitions.get());
+		}
+		descriptorProperties.getOptionalInt(CONNECTOR_READ_FETCH_SIZE).ifPresent(builder::setFetchSize);
+
+		return builder.build();
+	}
+
+	private JDBCLookupOptions getJDBCLookupOptions(DescriptorProperties descriptorProperties) {
+		final JDBCLookupOptions.Builder builder = JDBCLookupOptions.builder();
+
+		descriptorProperties.getOptionalLong(CONNECTOR_LOOKUP_CACHE_MAX_ROWS).ifPresent(builder::setCacheMaxSize);
+		descriptorProperties.getOptionalDuration(CONNECTOR_LOOKUP_CACHE_TTL).ifPresent(
+			s -> builder.setCacheExpireMs(s.toMillis()));
+		descriptorProperties.getOptionalInt(CONNECTOR_LOOKUP_MAX_RETRIES).ifPresent(builder::setMaxRetryTimes);
+
+		return builder.build();
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
index 6a2df0d..f6d0b9b 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.utils.TableConnectorUtils;
 import org.apache.flink.types.Row;
 
 import java.util.Arrays;
+import java.util.Objects;
 
 import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_INTERVAL_MILLS;
 import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_MAX_SIZE;
@@ -145,6 +146,22 @@ public class JDBCUpsertTableSink implements UpsertStreamTableSink<Row> {
 		return new Builder();
 	}
 
+	@Override
+	public boolean equals(Object o) {
+		if (o instanceof JDBCUpsertTableSink) {
+			JDBCUpsertTableSink sink = (JDBCUpsertTableSink) o;
+			return Objects.equals(schema, sink.schema) &&
+				Objects.equals(options, sink.options) &&
+				Objects.equals(flushMaxSize, sink.flushMaxSize) &&
+				Objects.equals(flushIntervalMills, sink.flushIntervalMills) &&
+				Objects.equals(maxRetryTime, sink.maxRetryTime) &&
+				Arrays.equals(keyFields, sink.keyFields) &&
+				Objects.equals(isAppendOnly, sink.isAppendOnly);
+		} else {
+			return false;
+		}
+	}
+
 	/**
 	 * Builder for a {@link JDBCUpsertTableSink}.
 	 */
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java
index 970fbb9..8c53eac 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java
@@ -126,6 +126,6 @@ public interface JDBCDialect extends Serializable {
 				.map(f -> quoteIdentifier(f) + "=?")
 				.collect(Collectors.joining(" AND "));
 		return "SELECT " + selectExpressions + " FROM " +
-				quoteIdentifier(tableName) + " WHERE " + fieldExpressions;
+				quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
 	}
 }
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
index 4b8ecd6..0e9011d 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.api.java.io.jdbc.split;
 
-import java.io.Serializable;
+import org.apache.flink.util.Preconditions;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
+import java.io.Serializable;
 
 /**
  * This query parameters generator is an helper class to parameterize from/to queries on a numeric column.
@@ -29,19 +29,32 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  *
  * <p>For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
  * <PRE>
- *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
+ * SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
  * </PRE>
  *
  * <p>You can take advantage of this class to automatically generate the parameters of the BETWEEN clause,
  * based on the passed constructor parameters.
- *
  */
 public class NumericBetweenParametersProvider implements ParameterValuesProvider {
 
-	private final long fetchSize;
 	private final long minVal;
 	private final long maxVal;
 
+	private long batchSize;
+	private int batchNum;
+
+	/**
+	 * NumericBetweenParametersProvider constructor.
+	 *
+	 * @param minVal the lower bound of the produced "from" values
+	 * @param maxVal the upper bound of the produced "to" values
+	 */
+	public NumericBetweenParametersProvider(long minVal, long maxVal) {
+		Preconditions.checkArgument(minVal <= maxVal, "minVal must not be larger than maxVal");
+		this.minVal = minVal;
+		this.maxVal = maxVal;
+	}
+
 	/**
 	 * NumericBetweenParametersProvider constructor.
 	 *
@@ -50,27 +63,51 @@ public class NumericBetweenParametersProvider implements ParameterValuesProvider
 	 * @param maxVal the upper bound of the produced "to" values
 	 */
 	public NumericBetweenParametersProvider(long fetchSize, long minVal, long maxVal) {
-		checkArgument(fetchSize > 0, "Fetch size must be greater than 0.");
-		checkArgument(minVal <= maxVal, "Min value cannot be greater than max value.");
-		this.fetchSize = fetchSize;
+		Preconditions.checkArgument(minVal <= maxVal, "minVal must not be larger than maxVal");
 		this.minVal = minVal;
 		this.maxVal = maxVal;
+		ofBatchSize(fetchSize);
+	}
+
+	public NumericBetweenParametersProvider ofBatchSize(long batchSize) {
+		Preconditions.checkArgument(batchSize > 0, "Batch size must be positive");
+
+		long maxElemCount = (maxVal - minVal) + 1;
+		if (batchSize > maxElemCount) {
+			batchSize = maxElemCount;
+		}
+		this.batchSize = batchSize;
+		this.batchNum = new Double(Math.ceil((double) maxElemCount / batchSize)).intValue();
+		return this;
+	}
+
+	public NumericBetweenParametersProvider ofBatchNum(int batchNum) {
+		Preconditions.checkArgument(batchNum > 0, "Batch number must be positive");
+
+		long maxElemCount = (maxVal - minVal) + 1;
+		if (batchNum > maxElemCount) {
+			batchNum = (int) maxElemCount;
+		}
+		this.batchNum = batchNum;
+		this.batchSize = new Double(Math.ceil((double) maxElemCount / batchNum)).longValue();
+		return this;
 	}
 
 	@Override
 	public Serializable[][] getParameterValues() {
-		double maxElemCount = (maxVal - minVal) + 1;
-		int numBatches = new Double(Math.ceil(maxElemCount / fetchSize)).intValue();
-		Serializable[][] parameters = new Serializable[numBatches][2];
-		int batchIndex = 0;
-		for (long start = minVal; start <= maxVal; start += fetchSize, batchIndex++) {
-			long end = start + fetchSize - 1;
-			if (end > maxVal) {
-				end = maxVal;
-			}
-			parameters[batchIndex] = new Long[]{start, end};
+		Preconditions.checkState(batchSize > 0,
+			"Batch size and batch number must be positive. Have you called `ofBatchSize` or `ofBatchNum`?");
+
+		long maxElemCount = (maxVal - minVal) + 1;
+		long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum;
+
+		Serializable[][] parameters = new Serializable[batchNum][2];
+		long start = minVal;
+		for (int i = 0; i < batchNum; i++) {
+			long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
+			parameters[i] = new Long[]{start, end};
+			start = end + 1;
 		}
 		return parameters;
 	}
-
 }
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCValidator.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCValidator.java
new file mode 100644
index 0000000..b0a7c18
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCValidator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Optional;
+
+/**
+ * The validator for JDBC.
+ */
+@Internal
+public class JDBCValidator extends ConnectorDescriptorValidator {
+
+	public static final String CONNECTOR_TYPE_VALUE_JDBC = "jdbc";
+
+	public static final String CONNECTOR_URL = "connector.url";
+	public static final String CONNECTOR_TABLE = "connector.table";
+	public static final String CONNECTOR_DRIVER = "connector.driver";
+	public static final String CONNECTOR_USERNAME = "connector.username";
+	public static final String CONNECTOR_PASSWORD = "connector.password";
+
+	public static final String CONNECTOR_READ_PARTITION_COLUMN = "connector.read.partition.column";
+	public static final String CONNECTOR_READ_PARTITION_LOWER_BOUND = "connector.read.partition.lower-bound";
+	public static final String CONNECTOR_READ_PARTITION_UPPER_BOUND = "connector.read.partition.upper-bound";
+	public static final String CONNECTOR_READ_PARTITION_NUM = "connector.read.partition.num";
+	public static final String CONNECTOR_READ_FETCH_SIZE = "connector.read.fetch-size";
+
+	public static final String CONNECTOR_LOOKUP_CACHE_MAX_ROWS = "connector.lookup.cache.max-rows";
+	public static final String CONNECTOR_LOOKUP_CACHE_TTL = "connector.lookup.cache.ttl";
+	public static final String CONNECTOR_LOOKUP_MAX_RETRIES = "connector.lookup.max-retries";
+
+	public static final String CONNECTOR_WRITE_FLUSH_MAX_ROWS = "connector.write.flush.max-rows";
+	public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = "connector.write.flush.interval";
+	public static final String CONNECTOR_WRITE_MAX_RETRIES = "connector.write.max-retries";
+
+	@Override
+	public void validate(DescriptorProperties properties) {
+		super.validate(properties);
+		validateCommonProperties(properties);
+		validateReadProperties(properties);
+		validateLookupProperties(properties);
+		validateSinkProperties(properties);
+	}
+
+	private void validateCommonProperties(DescriptorProperties properties) {
+		properties.validateString(CONNECTOR_URL, false, 1);
+		properties.validateString(CONNECTOR_TABLE, false, 1);
+		properties.validateString(CONNECTOR_DRIVER, true);
+		properties.validateString(CONNECTOR_USERNAME, true);
+		properties.validateString(CONNECTOR_PASSWORD, true);
+
+		final String url = properties.getString(CONNECTOR_URL);
+		final Optional<JDBCDialect> dialect = JDBCDialects.get(url);
+		Preconditions.checkState(dialect.isPresent(), "Cannot handle such jdbc url: " + url);
+
+		Optional<String> password = properties.getOptionalString(CONNECTOR_PASSWORD);
+		if (password.isPresent()) {
+			Preconditions.checkArgument(
+				properties.getOptionalString(CONNECTOR_USERNAME).isPresent(),
+				"Database username must be provided when database password is provided");
+		}
+	}
+
+	private void validateReadProperties(DescriptorProperties properties) {
+		properties.validateString(CONNECTOR_READ_PARTITION_COLUMN, true);
+		properties.validateLong(CONNECTOR_READ_PARTITION_LOWER_BOUND, true);
+		properties.validateLong(CONNECTOR_READ_PARTITION_UPPER_BOUND, true);
+		properties.validateInt(CONNECTOR_READ_PARTITION_NUM, true);
+		properties.validateInt(CONNECTOR_READ_FETCH_SIZE, true);
+
+		Optional<Long> lowerBound = properties.getOptionalLong(CONNECTOR_READ_PARTITION_LOWER_BOUND);
+		Optional<Long> upperBound = properties.getOptionalLong(CONNECTOR_READ_PARTITION_UPPER_BOUND);
+		if (lowerBound.isPresent() && upperBound.isPresent()) {
+			Preconditions.checkArgument(lowerBound.get() <= upperBound.get(),
+				CONNECTOR_READ_PARTITION_LOWER_BOUND + " must not be larger than " + CONNECTOR_READ_PARTITION_UPPER_BOUND);
+		}
+
+		checkAllOrNone(properties, new String[]{
+			CONNECTOR_READ_PARTITION_COLUMN,
+			CONNECTOR_READ_PARTITION_LOWER_BOUND,
+			CONNECTOR_READ_PARTITION_UPPER_BOUND,
+			CONNECTOR_READ_PARTITION_NUM
+		});
+	}
+
+	private void validateLookupProperties(DescriptorProperties properties) {
+		properties.validateLong(CONNECTOR_LOOKUP_CACHE_MAX_ROWS, true);
+		properties.validateDuration(CONNECTOR_LOOKUP_CACHE_TTL, true, 1);
+		properties.validateInt(CONNECTOR_LOOKUP_MAX_RETRIES, true);
+
+		checkAllOrNone(properties, new String[]{
+			CONNECTOR_LOOKUP_CACHE_MAX_ROWS,
+			CONNECTOR_LOOKUP_CACHE_TTL
+		});
+	}
+
+	private void validateSinkProperties(DescriptorProperties properties) {
+		properties.validateInt(CONNECTOR_WRITE_FLUSH_MAX_ROWS, true);
+		properties.validateDuration(CONNECTOR_WRITE_FLUSH_INTERVAL, true, 1);
+		properties.validateInt(CONNECTOR_WRITE_MAX_RETRIES, true);
+	}
+
+	private void checkAllOrNone(DescriptorProperties properties, String[] propertyNames) {
+		int presentCount = 0;
+		for (String name : propertyNames) {
+			if (properties.getOptionalString(name).isPresent()) {
+				presentCount++;
+			}
+		}
+		Preconditions.checkArgument(presentCount == 0 || presentCount == propertyNames.length,
+			"Either all or none of the following properties should be provided:\n" + String.join("\n", propertyNames));
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000..dbd648d
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
index 91b175d..14dc85a 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -85,7 +85,7 @@ public class JDBCFullTest extends JDBCTestBase {
 			//use a "splittable" query to exploit parallelism
 			inputBuilder = inputBuilder
 					.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
-					.setParametersProvider(new NumericBetweenParametersProvider(fetchSize, min, max));
+					.setParametersProvider(new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize));
 		}
 		DataSet<Row> source = environment.createInput(inputBuilder.finish());
 
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index 81320e6..559976d 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -221,7 +221,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 		final int fetchSize = 1;
 		final long min = TEST_DATA[0].id;
 		final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
-		ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
+		ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
 				.setDrivername(DRIVER_CLASS)
 				.setDBUrl(DB_URL)
@@ -257,7 +257,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 		final long min = TEST_DATA[0].id;
 		final long max = TEST_DATA[TEST_DATA.length - 1].id;
 		final long fetchSize = max + 1; //generate a single split
-		ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
+		ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
 				.setDrivername(DRIVER_CLASS)
 				.setDBUrl(DB_URL)
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java
new file mode 100644
index 0000000..33c7b68
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Test for {@link JDBCTableSource} and {@link JDBCUpsertTableSink} created
+ * by {@link JDBCTableSourceSinkFactory}.
+ */
+public class JDBCTableSourceSinkFactoryTest {
+
+	@Test
+	public void testJDBCCommonProperties() {
+		Map<String, String> properties = getBasicProperties();
+		properties.put("connector.driver", "org.apache.derby.jdbc.EmbeddedDriver");
+		properties.put("connector.username", "user");
+		properties.put("connector.password", "pass");
+
+		final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
+			.createStreamTableSource(properties);
+
+		final JDBCOptions options = JDBCOptions.builder()
+			.setDBUrl("jdbc:derby:memory:mydb")
+			.setTableName("mytable")
+			.setDriverName("org.apache.derby.jdbc.EmbeddedDriver")
+			.setUsername("user")
+			.setPassword("pass")
+			.build();
+		final TableSchema schema = TableSchema.builder()
+			.field("aaa", DataTypes.INT())
+			.field("bbb", DataTypes.STRING())
+			.field("ccc", DataTypes.DOUBLE())
+			.build();
+		final JDBCTableSource expected = JDBCTableSource.builder()
+			.setOptions(options)
+			.setSchema(schema)
+			.build();
+
+		assertEquals(expected, actual);
+	}
+
+	@Test
+	public void testJDBCReadProperties() {
+		Map<String, String> properties = getBasicProperties();
+		properties.put("connector.read.partition.column", "aaa");
+		properties.put("connector.read.partition.lower-bound", "-10");
+		properties.put("connector.read.partition.upper-bound", "100");
+		properties.put("connector.read.partition.num", "10");
+		properties.put("connector.read.fetch-size", "20");
+
+		final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
+			.createStreamTableSource(properties);
+
+		final JDBCOptions options = JDBCOptions.builder()
+			.setDBUrl("jdbc:derby:memory:mydb")
+			.setTableName("mytable")
+			.build();
+		final JDBCReadOptions readOptions = JDBCReadOptions.builder()
+			.setPartitionColumnName("aaa")
+			.setPartitionLowerBound(-10)
+			.setPartitionUpperBound(100)
+			.setNumPartitions(10)
+			.setFetchSize(20)
+			.build();
+		final TableSchema schema = TableSchema.builder()
+			.field("aaa", DataTypes.INT())
+			.field("bbb", DataTypes.STRING())
+			.field("ccc", DataTypes.DOUBLE())
+			.build();
+		final JDBCTableSource expected = JDBCTableSource.builder()
+			.setOptions(options)
+			.setReadOptions(readOptions)
+			.setSchema(schema)
+			.build();
+
+		assertEquals(expected, actual);
+	}
+
+	@Test
+	public void testJDBCLookupProperties() {
+		Map<String, String> properties = getBasicProperties();
+		properties.put("connector.lookup.cache.max-rows", "1000");
+		properties.put("connector.lookup.cache.ttl", "10s");
+		properties.put("connector.lookup.max-retries", "10");
+
+		final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
+			.createStreamTableSource(properties);
+
+		final JDBCOptions options = JDBCOptions.builder()
+			.setDBUrl("jdbc:derby:memory:mydb")
+			.setTableName("mytable")
+			.build();
+		final JDBCLookupOptions lookupOptions = JDBCLookupOptions.builder()
+			.setCacheMaxSize(1000)
+			.setCacheExpireMs(10_000)
+			.setMaxRetryTimes(10)
+			.build();
+		final TableSchema schema = TableSchema.builder()
+			.field("aaa", DataTypes.INT())
+			.field("bbb", DataTypes.STRING())
+			.field("ccc", DataTypes.DOUBLE())
+			.build();
+		final JDBCTableSource expected = JDBCTableSource.builder()
+			.setOptions(options)
+			.setLookupOptions(lookupOptions)
+			.setSchema(schema)
+			.build();
+
+		assertEquals(expected, actual);
+	}
+
+	@Test
+	public void testJDBCSinkProperties() {
+		Map<String, String> properties = getBasicProperties();
+		properties.put("connector.write.flush.max-rows", "1000");
+		properties.put("connector.write.flush.interval", "2min");
+		properties.put("connector.write.max-retries", "5");
+
+		final StreamTableSink<?> actual = TableFactoryService.find(StreamTableSinkFactory.class, properties)
+			.createStreamTableSink(properties);
+
+		final JDBCOptions options = JDBCOptions.builder()
+			.setDBUrl("jdbc:derby:memory:mydb")
+			.setTableName("mytable")
+			.build();
+		final TableSchema schema = TableSchema.builder()
+			.field("aaa", DataTypes.INT())
+			.field("bbb", DataTypes.STRING())
+			.field("ccc", DataTypes.DOUBLE())
+			.build();
+		final JDBCUpsertTableSink expected = JDBCUpsertTableSink.builder()
+			.setOptions(options)
+			.setTableSchema(schema)
+			.setFlushMaxSize(1000)
+			.setFlushIntervalMills(120_000)
+			.setMaxRetryTimes(5)
+			.build();
+
+		assertEquals(expected, actual);
+	}
+
+	@Test
+	public void testJDBCWithFilter() {
+		Map<String, String> properties = getBasicProperties();
+		properties.put("connector.driver", "org.apache.derby.jdbc.EmbeddedDriver");
+		properties.put("connector.username", "user");
+		properties.put("connector.password", "pass");
+
+		final TableSource<?> actual = ((JDBCTableSource) TableFactoryService
+			.find(StreamTableSourceFactory.class, properties)
+			.createStreamTableSource(properties))
+			.projectFields(new int[] {0, 2});
+
+		Map<String, DataType> projectedFields = ((FieldsDataType) actual.getProducedDataType()).getFieldDataTypes();
+		assertEquals(projectedFields.get("aaa"), DataTypes.INT());
+		assertNull(projectedFields.get("bbb"));
+		assertEquals(projectedFields.get("ccc"), DataTypes.DOUBLE());
+	}
+
+	@Test
+	public void testJDBCValidation() {
+		// only password, no username
+		try {
+			Map<String, String> properties = getBasicProperties();
+			properties.put("connector.password", "pass");
+
+			TableFactoryService.find(StreamTableSourceFactory.class, properties)
+				.createStreamTableSource(properties);
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {
+		}
+
+		// read partition properties not complete
+		try {
+			Map<String, String> properties = getBasicProperties();
+			properties.put("connector.read.partition.column", "aaa");
+			properties.put("connector.read.partition.lower-bound", "-10");
+			properties.put("connector.read.partition.upper-bound", "100");
+
+			TableFactoryService.find(StreamTableSourceFactory.class, properties)
+				.createStreamTableSource(properties);
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {
+		}
+
+		// read partition lower-bound > upper-bound
+		try {
+			Map<String, String> properties = getBasicProperties();
+			properties.put("connector.read.partition.column", "aaa");
+			properties.put("connector.read.partition.lower-bound", "100");
+			properties.put("connector.read.partition.upper-bound", "-10");
+			properties.put("connector.read.partition.num", "10");
+
+			TableFactoryService.find(StreamTableSourceFactory.class, properties)
+				.createStreamTableSource(properties);
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {
+		}
+
+		// lookup cache properties not complete
+		try {
+			Map<String, String> properties = getBasicProperties();
+			properties.put("connector.lookup.cache.max-rows", "10");
+
+			TableFactoryService.find(StreamTableSourceFactory.class, properties)
+				.createStreamTableSource(properties);
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {
+		}
+
+		// lookup cache properties not complete
+		try {
+			Map<String, String> properties = getBasicProperties();
+			properties.put("connector.lookup.cache.ttl", "1s");
+
+			TableFactoryService.find(StreamTableSourceFactory.class, properties)
+				.createStreamTableSource(properties);
+			fail("exception expected");
+		} catch (IllegalArgumentException ignored) {
+		}
+	}
+
+	private Map<String, String> getBasicProperties() {
+		Map<String, String> properties = new HashMap<>();
+
+		properties.put("connector.type", "jdbc");
+		properties.put("connector.property-version", "1");
+
+		properties.put("connector.url", "jdbc:derby:memory:mydb");
+		properties.put("connector.table", "mytable");
+
+		properties.put("schema.0.name", "aaa");
+		properties.put("schema.0.type", "INT");
+		properties.put("schema.1.name", "bbb");
+		properties.put("schema.1.type", "VARCHAR");
+		properties.put("schema.2.name", "ccc");
+		properties.put("schema.2.type", "DOUBLE");
+
+		return properties;
+	}
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProviderTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProviderTest.java
new file mode 100644
index 0000000..51f77e7
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProviderTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.split;
+
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link NumericBetweenParametersProvider}.
+ */
+public class NumericBetweenParametersProviderTest {
+
+	@Test
+	public void testBatchSizeDivisible() {
+		NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(-5, 9).ofBatchSize(3);
+		Serializable[][] actual = provider.getParameterValues();
+
+		long[][] expected = {
+			new long[]{-5, -3},
+			new long[]{-2, 0},
+			new long[]{1, 3},
+			new long[]{4, 6},
+			new long[]{7, 9}
+		};
+		check(expected, actual);
+	}
+
+	@Test
+	public void testBatchSizeNotDivisible() {
+		NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(-5, 11).ofBatchSize(4);
+		Serializable[][] actual = provider.getParameterValues();
+
+		long[][] expected = {
+			new long[]{-5, -2},
+			new long[]{-1, 2},
+			new long[]{3, 5},
+			new long[]{6, 8},
+			new long[]{9, 11}
+		};
+		check(expected, actual);
+	}
+
+	@Test
+	public void testBatchSizeTooLarge() {
+		NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(0, 2).ofBatchSize(5);
+		Serializable[][] actual = provider.getParameterValues();
+
+		long[][] expected = {new long[]{0, 2}};
+		check(expected, actual);
+	}
+
+	@Test
+	public void testBatchNumDivisible() {
+		NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(-5, 9).ofBatchNum(5);
+		Serializable[][] actual = provider.getParameterValues();
+
+		long[][] expected = {
+			new long[]{-5, -3},
+			new long[]{-2, 0},
+			new long[]{1, 3},
+			new long[]{4, 6},
+			new long[]{7, 9}
+		};
+		check(expected, actual);
+	}
+
+	@Test
+	public void testBatchNumNotDivisible() {
+		NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(-5, 11).ofBatchNum(5);
+		Serializable[][] actual = provider.getParameterValues();
+
+		long[][] expected = {
+			new long[]{-5, -2},
+			new long[]{-1, 2},
+			new long[]{3, 5},
+			new long[]{6, 8},
+			new long[]{9, 11}
+		};
+		check(expected, actual);
+	}
+
+	@Test
+	public void testBatchNumTooLarge() {
+		NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(0, 2).ofBatchNum(5);
+		Serializable[][] actual = provider.getParameterValues();
+
+		long[][] expected = {
+			new long[]{0, 0},
+			new long[]{1, 1},
+			new long[]{2, 2}};
+		check(expected, actual);
+	}
+
+	private void check(long[][] expected, Serializable[][] actual) {
+		assertEquals(expected.length, actual.length);
+		for (int i = 0; i < expected.length; i++) {
+			for (int j = 0; j < 2; j++) {
+				assertEquals(expected[i][j], ((Long) actual[i][j]).longValue());
+			}
+		}
+	}
+
+}