You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/11 16:11:43 UTC
[flink] branch master updated: [FLINK-13118][jdbc] Introduce JDBC
table factory and bridge JDBC table source with streaming table source
(#9029)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a91d951 [FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source (#9029)
a91d951 is described below
commit a91d9515dc3e2e694626a5fe2625a9d7402f67ef
Author: TsReaper <ts...@gmail.com>
AuthorDate: Fri Jul 12 00:11:25 2019 +0800
[FLINK-13118][jdbc] Introduce JDBC table factory and bridge JDBC table source with streaming table source (#9029)
---
.../flink/api/java/io/jdbc/JDBCLookupFunction.java | 2 +-
.../flink/api/java/io/jdbc/JDBCLookupOptions.java | 15 +-
.../apache/flink/api/java/io/jdbc/JDBCOptions.java | 18 +-
.../flink/api/java/io/jdbc/JDBCReadOptions.java | 146 +++++++++++
.../flink/api/java/io/jdbc/JDBCTableSource.java | 135 +++++++++-
.../java/io/jdbc/JDBCTableSourceSinkFactory.java | 192 ++++++++++++++
.../api/java/io/jdbc/JDBCUpsertTableSink.java | 17 ++
.../api/java/io/jdbc/dialect/JDBCDialect.java | 2 +-
.../split/NumericBetweenParametersProvider.java | 75 ++++--
.../flink/table/descriptors/JDBCValidator.java | 133 ++++++++++
.../org.apache.flink.table.factories.TableFactory | 16 ++
.../flink/api/java/io/jdbc/JDBCFullTest.java | 2 +-
.../api/java/io/jdbc/JDBCInputFormatTest.java | 4 +-
.../io/jdbc/JDBCTableSourceSinkFactoryTest.java | 276 +++++++++++++++++++++
.../NumericBetweenParametersProviderTest.java | 122 +++++++++
15 files changed, 1122 insertions(+), 33 deletions(-)
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
index 4a46441..80d6127 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
@@ -114,7 +114,7 @@ public class JDBCLookupFunction extends TableFunction<Row> {
try {
establishConnection();
statement = dbConn.prepareStatement(query);
- this.cache = cacheMaxSize == -1 ? null : CacheBuilder.newBuilder()
+ this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize(cacheMaxSize)
.build();
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java
index dd6dbbe..6137324 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupOptions.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.java.io.jdbc;
import java.io.Serializable;
+import java.util.Objects;
import static org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.DEFAULT_MAX_RETRY_TIMES;
@@ -53,8 +54,20 @@ public class JDBCLookupOptions implements Serializable {
return new Builder();
}
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof JDBCLookupOptions) {
+ JDBCLookupOptions options = (JDBCLookupOptions) o;
+ return Objects.equals(cacheMaxSize, options.cacheMaxSize) &&
+ Objects.equals(cacheExpireMs, options.cacheExpireMs) &&
+ Objects.equals(maxRetryTimes, options.maxRetryTimes);
+ } else {
+ return false;
+ }
+ }
+
/**
- * Builder of {@link JDBCOptions}.
+ * Builder of {@link JDBCLookupOptions}.
*/
public static class Builder {
private long cacheMaxSize = -1L;
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java
index e026a32..f68b14a 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOptions.java
@@ -21,12 +21,13 @@ package org.apache.flink.api.java.io.jdbc;
import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import java.util.Objects;
import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * Options for the JDBC connector.
+ * Common options of {@link JDBCScanOptions} and {@link JDBCLookupOptions} for the JDBC connector.
*/
public class JDBCOptions {
@@ -75,6 +76,21 @@ public class JDBCOptions {
return new Builder();
}
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof JDBCOptions) {
+ JDBCOptions options = (JDBCOptions) o;
+ return Objects.equals(dbURL, options.dbURL) &&
+ Objects.equals(tableName, options.tableName) &&
+ Objects.equals(driverName, options.driverName) &&
+ Objects.equals(username, options.username) &&
+ Objects.equals(password, options.password) &&
+ Objects.equals(dialect.getClass().getName(), options.dialect.getClass().getName());
+ } else {
+ return false;
+ }
+ }
+
/**
* Builder of {@link JDBCOptions}.
*/
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCReadOptions.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCReadOptions.java
new file mode 100644
index 0000000..4591e10
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCReadOptions.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Options for the JDBC scan.
+ */
+public class JDBCReadOptions implements Serializable {
+
+ private final String partitionColumnName;
+ private final Long partitionLowerBound;
+ private final Long partitionUpperBound;
+ private final Integer numPartitions;
+
+ private final int fetchSize;
+
+ private JDBCReadOptions(
+ String partitionColumnName,
+ Long partitionLowerBound,
+ Long partitionUpperBound,
+ Integer numPartitions,
+ int fetchSize) {
+ this.partitionColumnName = partitionColumnName;
+ this.partitionLowerBound = partitionLowerBound;
+ this.partitionUpperBound = partitionUpperBound;
+ this.numPartitions = numPartitions;
+
+ this.fetchSize = fetchSize;
+ }
+
+ public Optional<String> getPartitionColumnName() {
+ return Optional.ofNullable(partitionColumnName);
+ }
+
+ public Optional<Long> getPartitionLowerBound() {
+ return Optional.ofNullable(partitionLowerBound);
+ }
+
+ public Optional<Long> getPartitionUpperBound() {
+ return Optional.ofNullable(partitionUpperBound);
+ }
+
+ public Optional<Integer> getNumPartitions() {
+ return Optional.ofNullable(numPartitions);
+ }
+
+ public int getFetchSize() {
+ return fetchSize;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof JDBCReadOptions) {
+ JDBCReadOptions options = (JDBCReadOptions) o;
+ return Objects.equals(partitionColumnName, options.partitionColumnName) &&
+ Objects.equals(partitionLowerBound, options.partitionLowerBound) &&
+ Objects.equals(partitionUpperBound, options.partitionUpperBound) &&
+ Objects.equals(numPartitions, options.numPartitions) &&
+ Objects.equals(fetchSize, options.fetchSize);
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Builder of {@link JDBCReadOptions}.
+ */
+ public static class Builder {
+ private String partitionColumnName;
+ private Long partitionLowerBound;
+ private Long partitionUpperBound;
+ private Integer numPartitions;
+
+ private int fetchSize = 0;
+
+ /**
+ * optional, name of the column used for partitioning the input.
+ */
+ public Builder setPartitionColumnName(String partitionColumnName) {
+ this.partitionColumnName = partitionColumnName;
+ return this;
+ }
+
+ /**
+ * optional, the smallest value of the first partition.
+ */
+ public Builder setPartitionLowerBound(long partitionLowerBound) {
+ this.partitionLowerBound = partitionLowerBound;
+ return this;
+ }
+
+ /**
+ * optional, the largest value of the last partition.
+ */
+ public Builder setPartitionUpperBound(long partitionUpperBound) {
+ this.partitionUpperBound = partitionUpperBound;
+ return this;
+ }
+
+ /**
+ * optional, the maximum number of partitions that can be used for parallelism in table reading.
+ */
+ public Builder setNumPartitions(int numPartitions) {
+ this.numPartitions = numPartitions;
+ return this;
+ }
+
+ /**
+ * optional, the number of rows to fetch per round trip.
+ * default value is 0, according to the jdbc api, 0 means that fetchSize hint will be ignored.
+ */
+ public Builder setFetchSize(int fetchSize) {
+ this.fetchSize = fetchSize;
+ return this;
+ }
+
+ public JDBCReadOptions build() {
+ return new JDBCReadOptions(
+ partitionColumnName, partitionLowerBound, partitionUpperBound, numPartitions, fetchSize);
+ }
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
index 82c3c33..56298c7 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSource.java
@@ -18,30 +18,81 @@
package org.apache.flink.api.java.io.jdbc;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
+import java.util.Arrays;
+import java.util.Objects;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* {@link TableSource} for JDBC.
- * Now only support {@link LookupableTableSource}.
*/
-public class JDBCTableSource implements LookupableTableSource<Row> {
+public class JDBCTableSource implements
+ StreamTableSource<Row>,
+ ProjectableTableSource<Row>,
+ LookupableTableSource<Row> {
private final JDBCOptions options;
+ private final JDBCReadOptions readOptions;
private final JDBCLookupOptions lookupOptions;
private final TableSchema schema;
- public JDBCTableSource(
- JDBCOptions options, JDBCLookupOptions lookupOptions, TableSchema schema) {
+ // index of fields selected, null means that all fields are selected
+ private final int[] selectFields;
+ private final RowTypeInfo returnType;
+
+ private JDBCTableSource(
+ JDBCOptions options, JDBCReadOptions readOptions, JDBCLookupOptions lookupOptions, TableSchema schema) {
+ this(options, readOptions, lookupOptions, schema, null);
+ }
+
+ private JDBCTableSource(
+ JDBCOptions options, JDBCReadOptions readOptions, JDBCLookupOptions lookupOptions,
+ TableSchema schema, int[] selectFields) {
this.options = options;
+ this.readOptions = readOptions;
this.lookupOptions = lookupOptions;
this.schema = schema;
+
+ this.selectFields = selectFields;
+
+ final TypeInformation<?>[] schemaTypeInfos = schema.getFieldTypes();
+ final String[] schemaFieldNames = schema.getFieldNames();
+ if (selectFields != null) {
+ TypeInformation<?>[] typeInfos = new TypeInformation[selectFields.length];
+ String[] typeNames = new String[selectFields.length];
+ for (int i = 0; i < selectFields.length; i++) {
+ typeInfos[i] = schemaTypeInfos[selectFields[i]];
+ typeNames[i] = schemaFieldNames[selectFields[i]];
+ }
+ this.returnType = new RowTypeInfo(typeInfos, typeNames);
+ } else {
+ this.returnType = new RowTypeInfo(schemaTypeInfos, schemaFieldNames);
+ }
+ }
+
+ @Override
+ public boolean isBounded() {
+ return true;
+ }
+
+ @Override
+ public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
+ return execEnv.createInput(getInputFormat(), getReturnType()).name(explainSource());
}
@Override
@@ -49,13 +100,23 @@ public class JDBCTableSource implements LookupableTableSource<Row> {
return JDBCLookupFunction.builder()
.setOptions(options)
.setLookupOptions(lookupOptions)
- .setFieldTypes(schema.getFieldTypes())
- .setFieldNames(schema.getFieldNames())
+ .setFieldTypes(returnType.getFieldTypes())
+ .setFieldNames(returnType.getFieldNames())
.setKeyNames(lookupKeys)
.build();
}
@Override
+ public TypeInformation<Row> getReturnType() {
+ return returnType;
+ }
+
+ @Override
+ public TableSource<Row> projectFields(int[] fields) {
+ return new JDBCTableSource(options, readOptions, lookupOptions, schema, fields);
+ }
+
+ @Override
public AsyncTableFunction<Row> getAsyncLookupFunction(String[] lookupKeys) {
throw new UnsupportedOperationException();
}
@@ -74,12 +135,57 @@ public class JDBCTableSource implements LookupableTableSource<Row> {
return new Builder();
}
+ private JDBCInputFormat getInputFormat() {
+ JDBCInputFormat.JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat()
+ .setDrivername(options.getDriverName())
+ .setDBUrl(options.getDbURL())
+ .setUsername(options.getUsername())
+ .setPassword(options.getPassword())
+ .setRowTypeInfo(new RowTypeInfo(returnType.getFieldTypes(), returnType.getFieldNames()));
+
+ if (readOptions.getFetchSize() != 0) {
+ builder.setFetchSize(readOptions.getFetchSize());
+ }
+
+ final JDBCDialect dialect = options.getDialect();
+ String query = dialect.getSelectFromStatement(
+ options.getTableName(), returnType.getFieldNames(), new String[0]);
+ if (readOptions.getPartitionColumnName().isPresent()) {
+ long lowerBound = readOptions.getPartitionLowerBound().get();
+ long upperBound = readOptions.getPartitionUpperBound().get();
+ int numPartitions = readOptions.getNumPartitions().get();
+ builder.setParametersProvider(
+ new NumericBetweenParametersProvider(lowerBound, upperBound).ofBatchNum(numPartitions));
+ query += " WHERE " +
+ dialect.quoteIdentifier(readOptions.getPartitionColumnName().get()) +
+ " BETWEEN ? AND ?";
+ }
+ builder.setQuery(query);
+
+ return builder.finish();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof JDBCTableSource) {
+ JDBCTableSource source = (JDBCTableSource) o;
+ return Objects.equals(options, source.options) &&
+ Objects.equals(readOptions, source.readOptions) &&
+ Objects.equals(lookupOptions, source.lookupOptions) &&
+ Objects.equals(schema, source.schema) &&
+ Arrays.equals(selectFields, source.selectFields);
+ } else {
+ return false;
+ }
+ }
+
/**
* Builder for a {@link JDBCTableSource}.
*/
public static class Builder {
private JDBCOptions options;
+ private JDBCReadOptions readOptions;
private JDBCLookupOptions lookupOptions;
private TableSchema schema;
@@ -92,6 +198,15 @@ public class JDBCTableSource implements LookupableTableSource<Row> {
}
/**
+ * optional, scan related options.
+ * {@link JDBCReadOptions} will be only used for {@link StreamTableSource}.
+ */
+ public Builder setReadOptions(JDBCReadOptions readOptions) {
+ this.readOptions = readOptions;
+ return this;
+ }
+
+ /**
* optional, lookup related options.
* {@link JDBCLookupOptions} only be used for {@link LookupableTableSource}.
*/
@@ -116,7 +231,13 @@ public class JDBCTableSource implements LookupableTableSource<Row> {
public JDBCTableSource build() {
checkNotNull(options, "No options supplied.");
checkNotNull(schema, "No schema supplied.");
- return new JDBCTableSource(options, lookupOptions, schema);
+ if (readOptions == null) {
+ readOptions = JDBCReadOptions.builder().build();
+ }
+ if (lookupOptions == null) {
+ lookupOptions = JDBCLookupOptions.builder().build();
+ }
+ return new JDBCTableSource(options, readOptions, lookupOptions, schema);
}
}
}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
new file mode 100644
index 0000000..b90871c
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactory.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.JDBCValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_DRIVER;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_LOOKUP_CACHE_MAX_ROWS;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_LOOKUP_CACHE_TTL;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_LOOKUP_MAX_RETRIES;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_PASSWORD;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_FETCH_SIZE;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_PARTITION_COLUMN;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_PARTITION_LOWER_BOUND;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_PARTITION_NUM;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_READ_PARTITION_UPPER_BOUND;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_TABLE;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_TYPE_VALUE_JDBC;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_URL;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_USERNAME;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_WRITE_FLUSH_INTERVAL;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_WRITE_FLUSH_MAX_ROWS;
+import static org.apache.flink.table.descriptors.JDBCValidator.CONNECTOR_WRITE_MAX_RETRIES;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
+
+/**
+ * Factory for creating configured instances of {@link JDBCTableSource} and {@link JDBCUpsertTableSink}.
+ */
+public class JDBCTableSourceSinkFactory implements
+ StreamTableSourceFactory<Row>,
+ StreamTableSinkFactory<Tuple2<Boolean, Row>> {
+
+ @Override
+ public Map<String, String> requiredContext() {
+ Map<String, String> context = new HashMap<>();
+ context.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_JDBC); // jdbc
+ context.put(CONNECTOR_PROPERTY_VERSION, "1"); // backwards compatibility
+ return context;
+ }
+
+ @Override
+ public List<String> supportedProperties() {
+ List<String> properties = new ArrayList<>();
+
+ // common options
+ properties.add(CONNECTOR_DRIVER);
+ properties.add(CONNECTOR_URL);
+ properties.add(CONNECTOR_TABLE);
+ properties.add(CONNECTOR_USERNAME);
+ properties.add(CONNECTOR_PASSWORD);
+
+ // scan options
+ properties.add(CONNECTOR_READ_PARTITION_COLUMN);
+ properties.add(CONNECTOR_READ_PARTITION_NUM);
+ properties.add(CONNECTOR_READ_PARTITION_LOWER_BOUND);
+ properties.add(CONNECTOR_READ_PARTITION_UPPER_BOUND);
+ properties.add(CONNECTOR_READ_FETCH_SIZE);
+
+ // lookup options
+ properties.add(CONNECTOR_LOOKUP_CACHE_MAX_ROWS);
+ properties.add(CONNECTOR_LOOKUP_CACHE_TTL);
+ properties.add(CONNECTOR_LOOKUP_MAX_RETRIES);
+
+ // sink options
+ properties.add(CONNECTOR_WRITE_FLUSH_MAX_ROWS);
+ properties.add(CONNECTOR_WRITE_FLUSH_INTERVAL);
+ properties.add(CONNECTOR_WRITE_MAX_RETRIES);
+
+ // schema
+ properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
+ properties.add(SCHEMA + ".#." + SCHEMA_NAME);
+
+ return properties;
+ }
+
+ @Override
+ public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+
+ return JDBCTableSource.builder()
+ .setOptions(getJDBCOptions(descriptorProperties))
+ .setReadOptions(getJDBCReadOptions(descriptorProperties))
+ .setLookupOptions(getJDBCLookupOptions(descriptorProperties))
+ .setSchema(descriptorProperties.getTableSchema(SCHEMA))
+ .build();
+ }
+
+ @Override
+ public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+
+ final JDBCUpsertTableSink.Builder builder = JDBCUpsertTableSink.builder()
+ .setOptions(getJDBCOptions(descriptorProperties))
+ .setTableSchema(descriptorProperties.getTableSchema(SCHEMA));
+
+ descriptorProperties.getOptionalInt(CONNECTOR_WRITE_FLUSH_MAX_ROWS).ifPresent(builder::setFlushMaxSize);
+ descriptorProperties.getOptionalDuration(CONNECTOR_WRITE_FLUSH_INTERVAL).ifPresent(
+ s -> builder.setFlushIntervalMills(s.toMillis()));
+ descriptorProperties.getOptionalInt(CONNECTOR_WRITE_MAX_RETRIES).ifPresent(builder::setMaxRetryTimes);
+
+ return builder.build();
+ }
+
+ private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ descriptorProperties.putProperties(properties);
+
+ new SchemaValidator(true, false, false).validate(descriptorProperties);
+ new JDBCValidator().validate(descriptorProperties);
+
+ return descriptorProperties;
+ }
+
+ private JDBCOptions getJDBCOptions(DescriptorProperties descriptorProperties) {
+ final String url = descriptorProperties.getString(CONNECTOR_URL);
+ final JDBCOptions.Builder builder = JDBCOptions.builder()
+ .setDBUrl(url)
+ .setTableName(descriptorProperties.getString(CONNECTOR_TABLE))
+ .setDialect(JDBCDialects.get(url).get());
+
+ descriptorProperties.getOptionalString(CONNECTOR_DRIVER).ifPresent(builder::setDriverName);
+ descriptorProperties.getOptionalString(CONNECTOR_USERNAME).ifPresent(builder::setUsername);
+ descriptorProperties.getOptionalString(CONNECTOR_PASSWORD).ifPresent(builder::setPassword);
+
+ return builder.build();
+ }
+
+ private JDBCReadOptions getJDBCReadOptions(DescriptorProperties descriptorProperties) {
+ final Optional<String> partitionColumnName =
+ descriptorProperties.getOptionalString(CONNECTOR_READ_PARTITION_COLUMN);
+ final Optional<Long> partitionLower = descriptorProperties.getOptionalLong(CONNECTOR_READ_PARTITION_LOWER_BOUND);
+ final Optional<Long> partitionUpper = descriptorProperties.getOptionalLong(CONNECTOR_READ_PARTITION_UPPER_BOUND);
+ final Optional<Integer> numPartitions = descriptorProperties.getOptionalInt(CONNECTOR_READ_PARTITION_NUM);
+
+ final JDBCReadOptions.Builder builder = JDBCReadOptions.builder();
+ if (partitionColumnName.isPresent()) {
+ builder.setPartitionColumnName(partitionColumnName.get());
+ builder.setPartitionLowerBound(partitionLower.get());
+ builder.setPartitionUpperBound(partitionUpper.get());
+ builder.setNumPartitions(numPartitions.get());
+ }
+ descriptorProperties.getOptionalInt(CONNECTOR_READ_FETCH_SIZE).ifPresent(builder::setFetchSize);
+
+ return builder.build();
+ }
+
+ private JDBCLookupOptions getJDBCLookupOptions(DescriptorProperties descriptorProperties) {
+ final JDBCLookupOptions.Builder builder = JDBCLookupOptions.builder();
+
+ descriptorProperties.getOptionalLong(CONNECTOR_LOOKUP_CACHE_MAX_ROWS).ifPresent(builder::setCacheMaxSize);
+ descriptorProperties.getOptionalDuration(CONNECTOR_LOOKUP_CACHE_TTL).ifPresent(
+ s -> builder.setCacheExpireMs(s.toMillis()));
+ descriptorProperties.getOptionalInt(CONNECTOR_LOOKUP_MAX_RETRIES).ifPresent(builder::setMaxRetryTimes);
+
+ return builder.build();
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
index 6a2df0d..f6d0b9b 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCUpsertTableSink.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.types.Row;
import java.util.Arrays;
+import java.util.Objects;
import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_INTERVAL_MILLS;
import static org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.DEFAULT_FLUSH_MAX_SIZE;
@@ -145,6 +146,22 @@ public class JDBCUpsertTableSink implements UpsertStreamTableSink<Row> {
return new Builder();
}
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof JDBCUpsertTableSink) {
+ JDBCUpsertTableSink sink = (JDBCUpsertTableSink) o;
+ return Objects.equals(schema, sink.schema) &&
+ Objects.equals(options, sink.options) &&
+ Objects.equals(flushMaxSize, sink.flushMaxSize) &&
+ Objects.equals(flushIntervalMills, sink.flushIntervalMills) &&
+ Objects.equals(maxRetryTime, sink.maxRetryTime) &&
+ Arrays.equals(keyFields, sink.keyFields) &&
+ Objects.equals(isAppendOnly, sink.isAppendOnly);
+ } else {
+ return false;
+ }
+ }
+
/**
* Builder for a {@link JDBCUpsertTableSink}.
*/
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java
index 970fbb9..8c53eac 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialect.java
@@ -126,6 +126,6 @@ public interface JDBCDialect extends Serializable {
.map(f -> quoteIdentifier(f) + "=?")
.collect(Collectors.joining(" AND "));
return "SELECT " + selectExpressions + " FROM " +
- quoteIdentifier(tableName) + " WHERE " + fieldExpressions;
+ quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
}
}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
index 4b8ecd6..0e9011d 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
@@ -18,9 +18,9 @@
package org.apache.flink.api.java.io.jdbc.split;
-import java.io.Serializable;
+import org.apache.flink.util.Preconditions;
-import static org.apache.flink.util.Preconditions.checkArgument;
+import java.io.Serializable;
/**
* This query parameters generator is an helper class to parameterize from/to queries on a numeric column.
@@ -29,19 +29,32 @@ import static org.apache.flink.util.Preconditions.checkArgument;
*
* <p>For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
* <PRE>
- * SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
+ * SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
* </PRE>
*
* <p>You can take advantage of this class to automatically generate the parameters of the BETWEEN clause,
* based on the passed constructor parameters.
- *
*/
public class NumericBetweenParametersProvider implements ParameterValuesProvider {
- private final long fetchSize;
private final long minVal;
private final long maxVal;
+ private long batchSize;
+ private int batchNum;
+
+ /**
+ * NumericBetweenParametersProvider constructor.
+ *
+ * @param minVal the lower bound of the produced "from" values
+ * @param maxVal the upper bound of the produced "to" values
+ */
+ public NumericBetweenParametersProvider(long minVal, long maxVal) {
+ Preconditions.checkArgument(minVal <= maxVal, "minVal must not be larger than maxVal");
+ this.minVal = minVal;
+ this.maxVal = maxVal;
+ }
+
/**
* NumericBetweenParametersProvider constructor.
*
@@ -50,27 +63,51 @@ public class NumericBetweenParametersProvider implements ParameterValuesProvider
* @param maxVal the upper bound of the produced "to" values
*/
public NumericBetweenParametersProvider(long fetchSize, long minVal, long maxVal) {
- checkArgument(fetchSize > 0, "Fetch size must be greater than 0.");
- checkArgument(minVal <= maxVal, "Min value cannot be greater than max value.");
- this.fetchSize = fetchSize;
+ Preconditions.checkArgument(minVal <= maxVal, "minVal must not be larger than maxVal");
this.minVal = minVal;
this.maxVal = maxVal;
+ ofBatchSize(fetchSize);
+ }
+
+ public NumericBetweenParametersProvider ofBatchSize(long batchSize) {
+ Preconditions.checkArgument(batchSize > 0, "Batch size must be positive");
+
+ long maxElemCount = (maxVal - minVal) + 1;
+ if (batchSize > maxElemCount) {
+ batchSize = maxElemCount;
+ }
+ this.batchSize = batchSize;
+ this.batchNum = new Double(Math.ceil((double) maxElemCount / batchSize)).intValue();
+ return this;
+ }
+
+ public NumericBetweenParametersProvider ofBatchNum(int batchNum) {
+ Preconditions.checkArgument(batchNum > 0, "Batch number must be positive");
+
+ long maxElemCount = (maxVal - minVal) + 1;
+ if (batchNum > maxElemCount) {
+ batchNum = (int) maxElemCount;
+ }
+ this.batchNum = batchNum;
+ this.batchSize = new Double(Math.ceil((double) maxElemCount / batchNum)).longValue();
+ return this;
}
@Override
public Serializable[][] getParameterValues() {
- double maxElemCount = (maxVal - minVal) + 1;
- int numBatches = new Double(Math.ceil(maxElemCount / fetchSize)).intValue();
- Serializable[][] parameters = new Serializable[numBatches][2];
- int batchIndex = 0;
- for (long start = minVal; start <= maxVal; start += fetchSize, batchIndex++) {
- long end = start + fetchSize - 1;
- if (end > maxVal) {
- end = maxVal;
- }
- parameters[batchIndex] = new Long[]{start, end};
+ Preconditions.checkState(batchSize > 0,
+ "Batch size and batch number must be positive. Have you called `ofBatchSize` or `ofBatchNum`?");
+
+ long maxElemCount = (maxVal - minVal) + 1;
+ long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum;
+
+ Serializable[][] parameters = new Serializable[batchNum][2];
+ long start = minVal;
+ for (int i = 0; i < batchNum; i++) {
+ long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
+ parameters[i] = new Long[]{start, end};
+ start = end + 1;
}
return parameters;
}
-
}
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCValidator.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCValidator.java
new file mode 100644
index 0000000..b0a7c18
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/table/descriptors/JDBCValidator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
+import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Optional;
+
+/**
+ * The validator for JDBC.
+ */
+@Internal
+public class JDBCValidator extends ConnectorDescriptorValidator {
+
+ public static final String CONNECTOR_TYPE_VALUE_JDBC = "jdbc";
+
+ public static final String CONNECTOR_URL = "connector.url";
+ public static final String CONNECTOR_TABLE = "connector.table";
+ public static final String CONNECTOR_DRIVER = "connector.driver";
+ public static final String CONNECTOR_USERNAME = "connector.username";
+ public static final String CONNECTOR_PASSWORD = "connector.password";
+
+ public static final String CONNECTOR_READ_PARTITION_COLUMN = "connector.read.partition.column";
+ public static final String CONNECTOR_READ_PARTITION_LOWER_BOUND = "connector.read.partition.lower-bound";
+ public static final String CONNECTOR_READ_PARTITION_UPPER_BOUND = "connector.read.partition.upper-bound";
+ public static final String CONNECTOR_READ_PARTITION_NUM = "connector.read.partition.num";
+ public static final String CONNECTOR_READ_FETCH_SIZE = "connector.read.fetch-size";
+
+ public static final String CONNECTOR_LOOKUP_CACHE_MAX_ROWS = "connector.lookup.cache.max-rows";
+ public static final String CONNECTOR_LOOKUP_CACHE_TTL = "connector.lookup.cache.ttl";
+ public static final String CONNECTOR_LOOKUP_MAX_RETRIES = "connector.lookup.max-retries";
+
+ public static final String CONNECTOR_WRITE_FLUSH_MAX_ROWS = "connector.write.flush.max-rows";
+ public static final String CONNECTOR_WRITE_FLUSH_INTERVAL = "connector.write.flush.interval";
+ public static final String CONNECTOR_WRITE_MAX_RETRIES = "connector.write.max-retries";
+
+ @Override
+ public void validate(DescriptorProperties properties) {
+ super.validate(properties);
+ validateCommonProperties(properties);
+ validateReadProperties(properties);
+ validateLookupProperties(properties);
+ validateSinkProperties(properties);
+ }
+
+ private void validateCommonProperties(DescriptorProperties properties) {
+ properties.validateString(CONNECTOR_URL, false, 1);
+ properties.validateString(CONNECTOR_TABLE, false, 1);
+ properties.validateString(CONNECTOR_DRIVER, true);
+ properties.validateString(CONNECTOR_USERNAME, true);
+ properties.validateString(CONNECTOR_PASSWORD, true);
+
+ final String url = properties.getString(CONNECTOR_URL);
+ final Optional<JDBCDialect> dialect = JDBCDialects.get(url);
+ Preconditions.checkState(dialect.isPresent(), "Cannot handle such jdbc url: " + url);
+
+ Optional<String> password = properties.getOptionalString(CONNECTOR_PASSWORD);
+ if (password.isPresent()) {
+ Preconditions.checkArgument(
+ properties.getOptionalString(CONNECTOR_USERNAME).isPresent(),
+ "Database username must be provided when database password is provided");
+ }
+ }
+
+ private void validateReadProperties(DescriptorProperties properties) {
+ properties.validateString(CONNECTOR_READ_PARTITION_COLUMN, true);
+ properties.validateLong(CONNECTOR_READ_PARTITION_LOWER_BOUND, true);
+ properties.validateLong(CONNECTOR_READ_PARTITION_UPPER_BOUND, true);
+ properties.validateInt(CONNECTOR_READ_PARTITION_NUM, true);
+ properties.validateInt(CONNECTOR_READ_FETCH_SIZE, true);
+
+ Optional<Long> lowerBound = properties.getOptionalLong(CONNECTOR_READ_PARTITION_LOWER_BOUND);
+ Optional<Long> upperBound = properties.getOptionalLong(CONNECTOR_READ_PARTITION_UPPER_BOUND);
+ if (lowerBound.isPresent() && upperBound.isPresent()) {
+ Preconditions.checkArgument(lowerBound.get() <= upperBound.get(),
+ CONNECTOR_READ_PARTITION_LOWER_BOUND + " must not be larger than " + CONNECTOR_READ_PARTITION_UPPER_BOUND);
+ }
+
+ checkAllOrNone(properties, new String[]{
+ CONNECTOR_READ_PARTITION_COLUMN,
+ CONNECTOR_READ_PARTITION_LOWER_BOUND,
+ CONNECTOR_READ_PARTITION_UPPER_BOUND,
+ CONNECTOR_READ_PARTITION_NUM
+ });
+ }
+
+ private void validateLookupProperties(DescriptorProperties properties) {
+ properties.validateLong(CONNECTOR_LOOKUP_CACHE_MAX_ROWS, true);
+ properties.validateDuration(CONNECTOR_LOOKUP_CACHE_TTL, true, 1);
+ properties.validateInt(CONNECTOR_LOOKUP_MAX_RETRIES, true);
+
+ checkAllOrNone(properties, new String[]{
+ CONNECTOR_LOOKUP_CACHE_MAX_ROWS,
+ CONNECTOR_LOOKUP_CACHE_TTL
+ });
+ }
+
+ private void validateSinkProperties(DescriptorProperties properties) {
+ properties.validateInt(CONNECTOR_WRITE_FLUSH_MAX_ROWS, true);
+ properties.validateDuration(CONNECTOR_WRITE_FLUSH_INTERVAL, true, 1);
+ properties.validateInt(CONNECTOR_WRITE_MAX_RETRIES, true);
+ }
+
+ private void checkAllOrNone(DescriptorProperties properties, String[] propertyNames) {
+ int presentCount = 0;
+ for (String name : propertyNames) {
+ if (properties.getOptionalString(name).isPresent()) {
+ presentCount++;
+ }
+ }
+ Preconditions.checkArgument(presentCount == 0 || presentCount == propertyNames.length,
+ "Either all or none of the following properties should be provided:\n" + String.join("\n", propertyNames));
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
new file mode 100644
index 0000000..dbd648d
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
index 91b175d..14dc85a 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -85,7 +85,7 @@ public class JDBCFullTest extends JDBCTestBase {
//use a "splittable" query to exploit parallelism
inputBuilder = inputBuilder
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
- .setParametersProvider(new NumericBetweenParametersProvider(fetchSize, min, max));
+ .setParametersProvider(new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize));
}
DataSet<Row> source = environment.createInput(inputBuilder.finish());
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index 81320e6..559976d 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -221,7 +221,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
final int fetchSize = 1;
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
- ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
+ ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
@@ -257,7 +257,7 @@ public class JDBCInputFormatTest extends JDBCTestBase {
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - 1].id;
final long fetchSize = max + 1; //generate a single split
- ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
+ ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java
new file mode 100644
index 0000000..33c7b68
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Test for {@link JDBCTableSource} and {@link JDBCUpsertTableSink} created
+ * by {@link JDBCTableSourceSinkFactory}.
+ */
+public class JDBCTableSourceSinkFactoryTest {
+
+ @Test
+ public void testJDBCCommonProperties() {
+ Map<String, String> properties = getBasicProperties();
+ properties.put("connector.driver", "org.apache.derby.jdbc.EmbeddedDriver");
+ properties.put("connector.username", "user");
+ properties.put("connector.password", "pass");
+
+ final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
+ .createStreamTableSource(properties);
+
+ final JDBCOptions options = JDBCOptions.builder()
+ .setDBUrl("jdbc:derby:memory:mydb")
+ .setTableName("mytable")
+ .setDriverName("org.apache.derby.jdbc.EmbeddedDriver")
+ .setUsername("user")
+ .setPassword("pass")
+ .build();
+ final TableSchema schema = TableSchema.builder()
+ .field("aaa", DataTypes.INT())
+ .field("bbb", DataTypes.STRING())
+ .field("ccc", DataTypes.DOUBLE())
+ .build();
+ final JDBCTableSource expected = JDBCTableSource.builder()
+ .setOptions(options)
+ .setSchema(schema)
+ .build();
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testJDBCReadProperties() {
+ Map<String, String> properties = getBasicProperties();
+ properties.put("connector.read.partition.column", "aaa");
+ properties.put("connector.read.partition.lower-bound", "-10");
+ properties.put("connector.read.partition.upper-bound", "100");
+ properties.put("connector.read.partition.num", "10");
+ properties.put("connector.read.fetch-size", "20");
+
+ final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
+ .createStreamTableSource(properties);
+
+ final JDBCOptions options = JDBCOptions.builder()
+ .setDBUrl("jdbc:derby:memory:mydb")
+ .setTableName("mytable")
+ .build();
+ final JDBCReadOptions readOptions = JDBCReadOptions.builder()
+ .setPartitionColumnName("aaa")
+ .setPartitionLowerBound(-10)
+ .setPartitionUpperBound(100)
+ .setNumPartitions(10)
+ .setFetchSize(20)
+ .build();
+ final TableSchema schema = TableSchema.builder()
+ .field("aaa", DataTypes.INT())
+ .field("bbb", DataTypes.STRING())
+ .field("ccc", DataTypes.DOUBLE())
+ .build();
+ final JDBCTableSource expected = JDBCTableSource.builder()
+ .setOptions(options)
+ .setReadOptions(readOptions)
+ .setSchema(schema)
+ .build();
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testJDBCLookupProperties() {
+ Map<String, String> properties = getBasicProperties();
+ properties.put("connector.lookup.cache.max-rows", "1000");
+ properties.put("connector.lookup.cache.ttl", "10s");
+ properties.put("connector.lookup.max-retries", "10");
+
+ final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
+ .createStreamTableSource(properties);
+
+ final JDBCOptions options = JDBCOptions.builder()
+ .setDBUrl("jdbc:derby:memory:mydb")
+ .setTableName("mytable")
+ .build();
+ final JDBCLookupOptions lookupOptions = JDBCLookupOptions.builder()
+ .setCacheMaxSize(1000)
+ .setCacheExpireMs(10_000)
+ .setMaxRetryTimes(10)
+ .build();
+ final TableSchema schema = TableSchema.builder()
+ .field("aaa", DataTypes.INT())
+ .field("bbb", DataTypes.STRING())
+ .field("ccc", DataTypes.DOUBLE())
+ .build();
+ final JDBCTableSource expected = JDBCTableSource.builder()
+ .setOptions(options)
+ .setLookupOptions(lookupOptions)
+ .setSchema(schema)
+ .build();
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testJDBCSinkProperties() {
+ Map<String, String> properties = getBasicProperties();
+ properties.put("connector.write.flush.max-rows", "1000");
+ properties.put("connector.write.flush.interval", "2min");
+ properties.put("connector.write.max-retries", "5");
+
+ final StreamTableSink<?> actual = TableFactoryService.find(StreamTableSinkFactory.class, properties)
+ .createStreamTableSink(properties);
+
+ final JDBCOptions options = JDBCOptions.builder()
+ .setDBUrl("jdbc:derby:memory:mydb")
+ .setTableName("mytable")
+ .build();
+ final TableSchema schema = TableSchema.builder()
+ .field("aaa", DataTypes.INT())
+ .field("bbb", DataTypes.STRING())
+ .field("ccc", DataTypes.DOUBLE())
+ .build();
+ final JDBCUpsertTableSink expected = JDBCUpsertTableSink.builder()
+ .setOptions(options)
+ .setTableSchema(schema)
+ .setFlushMaxSize(1000)
+ .setFlushIntervalMills(120_000)
+ .setMaxRetryTimes(5)
+ .build();
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testJDBCWithFilter() {
+ Map<String, String> properties = getBasicProperties();
+ properties.put("connector.driver", "org.apache.derby.jdbc.EmbeddedDriver");
+ properties.put("connector.username", "user");
+ properties.put("connector.password", "pass");
+
+ final TableSource<?> actual = ((JDBCTableSource) TableFactoryService
+ .find(StreamTableSourceFactory.class, properties)
+ .createStreamTableSource(properties))
+ .projectFields(new int[] {0, 2});
+
+ Map<String, DataType> projectedFields = ((FieldsDataType) actual.getProducedDataType()).getFieldDataTypes();
+ assertEquals(projectedFields.get("aaa"), DataTypes.INT());
+ assertNull(projectedFields.get("bbb"));
+ assertEquals(projectedFields.get("ccc"), DataTypes.DOUBLE());
+ }
+
+ @Test
+ public void testJDBCValidation() {
+ // only password, no username
+ try {
+ Map<String, String> properties = getBasicProperties();
+ properties.put("connector.password", "pass");
+
+ TableFactoryService.find(StreamTableSourceFactory.class, properties)
+ .createStreamTableSource(properties);
+ fail("exception expected");
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ // read partition properties not complete
+ try {
+ Map<String, String> properties = getBasicProperties();
+ properties.put("connector.read.partition.column", "aaa");
+ properties.put("connector.read.partition.lower-bound", "-10");
+ properties.put("connector.read.partition.upper-bound", "100");
+
+ TableFactoryService.find(StreamTableSourceFactory.class, properties)
+ .createStreamTableSource(properties);
+ fail("exception expected");
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ // read partition lower-bound > upper-bound
+ try {
+ Map<String, String> properties = getBasicProperties();
+ properties.put("connector.read.partition.column", "aaa");
+ properties.put("connector.read.partition.lower-bound", "100");
+ properties.put("connector.read.partition.upper-bound", "-10");
+ properties.put("connector.read.partition.num", "10");
+
+ TableFactoryService.find(StreamTableSourceFactory.class, properties)
+ .createStreamTableSource(properties);
+ fail("exception expected");
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ // lookup cache properties not complete
+ try {
+ Map<String, String> properties = getBasicProperties();
+ properties.put("connector.lookup.cache.max-rows", "10");
+
+ TableFactoryService.find(StreamTableSourceFactory.class, properties)
+ .createStreamTableSource(properties);
+ fail("exception expected");
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ // lookup cache properties not complete
+ try {
+ Map<String, String> properties = getBasicProperties();
+ properties.put("connector.lookup.cache.ttl", "1s");
+
+ TableFactoryService.find(StreamTableSourceFactory.class, properties)
+ .createStreamTableSource(properties);
+ fail("exception expected");
+ } catch (IllegalArgumentException ignored) {
+ }
+ }
+
+ private Map<String, String> getBasicProperties() {
+ Map<String, String> properties = new HashMap<>();
+
+ properties.put("connector.type", "jdbc");
+ properties.put("connector.property-version", "1");
+
+ properties.put("connector.url", "jdbc:derby:memory:mydb");
+ properties.put("connector.table", "mytable");
+
+ properties.put("schema.0.name", "aaa");
+ properties.put("schema.0.type", "INT");
+ properties.put("schema.1.name", "bbb");
+ properties.put("schema.1.type", "VARCHAR");
+ properties.put("schema.2.name", "ccc");
+ properties.put("schema.2.type", "DOUBLE");
+
+ return properties;
+ }
+}
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProviderTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProviderTest.java
new file mode 100644
index 0000000..51f77e7
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProviderTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.split;
+
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link NumericBetweenParametersProvider}.
+ */
+public class NumericBetweenParametersProviderTest {
+
+ @Test
+ public void testBatchSizeDivisible() {
+ NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(-5, 9).ofBatchSize(3);
+ Serializable[][] actual = provider.getParameterValues();
+
+ long[][] expected = {
+ new long[]{-5, -3},
+ new long[]{-2, 0},
+ new long[]{1, 3},
+ new long[]{4, 6},
+ new long[]{7, 9}
+ };
+ check(expected, actual);
+ }
+
+ @Test
+ public void testBatchSizeNotDivisible() {
+ NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(-5, 11).ofBatchSize(4);
+ Serializable[][] actual = provider.getParameterValues();
+
+ long[][] expected = {
+ new long[]{-5, -2},
+ new long[]{-1, 2},
+ new long[]{3, 5},
+ new long[]{6, 8},
+ new long[]{9, 11}
+ };
+ check(expected, actual);
+ }
+
+ @Test
+ public void testBatchSizeTooLarge() {
+ NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(0, 2).ofBatchSize(5);
+ Serializable[][] actual = provider.getParameterValues();
+
+ long[][] expected = {new long[]{0, 2}};
+ check(expected, actual);
+ }
+
+ @Test
+ public void testBatchNumDivisible() {
+ NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(-5, 9).ofBatchNum(5);
+ Serializable[][] actual = provider.getParameterValues();
+
+ long[][] expected = {
+ new long[]{-5, -3},
+ new long[]{-2, 0},
+ new long[]{1, 3},
+ new long[]{4, 6},
+ new long[]{7, 9}
+ };
+ check(expected, actual);
+ }
+
+ @Test
+ public void testBatchNumNotDivisible() {
+ NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(-5, 11).ofBatchNum(5);
+ Serializable[][] actual = provider.getParameterValues();
+
+ long[][] expected = {
+ new long[]{-5, -2},
+ new long[]{-1, 2},
+ new long[]{3, 5},
+ new long[]{6, 8},
+ new long[]{9, 11}
+ };
+ check(expected, actual);
+ }
+
+ @Test
+ public void testBatchNumTooLarge() {
+ NumericBetweenParametersProvider provider = new NumericBetweenParametersProvider(0, 2).ofBatchNum(5);
+ Serializable[][] actual = provider.getParameterValues();
+
+ long[][] expected = {
+ new long[]{0, 0},
+ new long[]{1, 1},
+ new long[]{2, 2}};
+ check(expected, actual);
+ }
+
+ private void check(long[][] expected, Serializable[][] actual) {
+ assertEquals(expected.length, actual.length);
+ for (int i = 0; i < expected.length; i++) {
+ for (int j = 0; j < 2; j++) {
+ assertEquals(expected[i][j], ((Long) actual[i][j]).longValue());
+ }
+ }
+ }
+
+}