You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/05/19 17:50:35 UTC
[flink] 01/02: [FLINK-17361] Add custom query on JDBC tables
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit c34a4f288deb7dd349c6e52f674d7fa95aa013a1
Author: Flavio Pompermaier <f....@gmail.com>
AuthorDate: Mon May 4 18:15:38 2020 +0200
[FLINK-17361] Add custom query on JDBC tables
---
docs/dev/table/connect.md | 6 ++++-
.../jdbc/internal/options/JdbcReadOptions.java | 29 ++++++++++++++++-----
.../connector/jdbc/table/JdbcTableSource.java | 9 +++++--
.../jdbc/table/JdbcTableSourceSinkFactory.java | 6 +++++
.../flink/table/descriptors/JdbcValidator.java | 2 ++
.../jdbc/table/JdbcTableSourceITCase.java | 30 ++++++++++++++++++++++
.../jdbc/table/JdbcTableSourceSinkFactoryTest.java | 2 ++
7 files changed, 75 insertions(+), 9 deletions(-)
diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 4a1e83a..ac2646c 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -1307,7 +1307,11 @@ CREATE TABLE MyUserTable (
'connector.username' = 'name',
'connector.password' = 'password',
- -- **followings are scan options, optional, used when reading from table**
+ -- **followings are scan options, optional, used when reading from a table**
+
+ -- optional: SQL query / prepared statement.
+ -- If set, this will take precedence over the 'connector.table' setting
+ 'connector.read.query' = 'SELECT * FROM sometable',
-- These options must all be specified if any of them is specified. In addition,
-- partition.num must be specified. They describe how to partition the table when
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
index a1350ab..65b5729 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
@@ -27,6 +27,7 @@ import java.util.Optional;
*/
public class JdbcReadOptions implements Serializable {
+ private final String query;
private final String partitionColumnName;
private final Long partitionLowerBound;
private final Long partitionUpperBound;
@@ -35,11 +36,13 @@ public class JdbcReadOptions implements Serializable {
private final int fetchSize;
private JdbcReadOptions(
+ String query,
String partitionColumnName,
Long partitionLowerBound,
Long partitionUpperBound,
Integer numPartitions,
int fetchSize) {
+ this.query = query;
this.partitionColumnName = partitionColumnName;
this.partitionLowerBound = partitionLowerBound;
this.partitionUpperBound = partitionUpperBound;
@@ -48,6 +51,10 @@ public class JdbcReadOptions implements Serializable {
this.fetchSize = fetchSize;
}
+ public Optional<String> getQuery() {
+ return Optional.ofNullable(query);
+ }
+
public Optional<String> getPartitionColumnName() {
return Optional.ofNullable(partitionColumnName);
}
@@ -76,11 +83,12 @@ public class JdbcReadOptions implements Serializable {
public boolean equals(Object o) {
if (o instanceof JdbcReadOptions) {
JdbcReadOptions options = (JdbcReadOptions) o;
- return Objects.equals(partitionColumnName, options.partitionColumnName) &&
- Objects.equals(partitionLowerBound, options.partitionLowerBound) &&
- Objects.equals(partitionUpperBound, options.partitionUpperBound) &&
- Objects.equals(numPartitions, options.numPartitions) &&
- Objects.equals(fetchSize, options.fetchSize);
+ return Objects.equals(query, options.query) &&
+ Objects.equals(partitionColumnName, options.partitionColumnName) &&
+ Objects.equals(partitionLowerBound, options.partitionLowerBound) &&
+ Objects.equals(partitionUpperBound, options.partitionUpperBound) &&
+ Objects.equals(numPartitions, options.numPartitions) &&
+ Objects.equals(fetchSize, options.fetchSize);
} else {
return false;
}
@@ -90,6 +98,7 @@ public class JdbcReadOptions implements Serializable {
* Builder of {@link JdbcReadOptions}.
*/
public static class Builder {
+ protected String query;
protected String partitionColumnName;
protected Long partitionLowerBound;
protected Long partitionUpperBound;
@@ -98,6 +107,14 @@ public class JdbcReadOptions implements Serializable {
protected int fetchSize = 0;
/**
+ * optional, SQL query statement for this JDBC source.
+ */
+ public Builder setQuery(String query) {
+ this.query = query;
+ return this;
+ }
+
+ /**
* optional, name of the column used for partitioning the input.
*/
public Builder setPartitionColumnName(String partitionColumnName) {
@@ -140,7 +157,7 @@ public class JdbcReadOptions implements Serializable {
public JdbcReadOptions build() {
return new JdbcReadOptions(
- partitionColumnName, partitionLowerBound, partitionUpperBound, numPartitions, fetchSize);
+ query, partitionColumnName, partitionLowerBound, partitionUpperBound, numPartitions, fetchSize);
}
}
}
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java
index a599f2f..ff21aae 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java
@@ -168,8 +168,7 @@ public class JdbcTableSource implements
}
final JdbcDialect dialect = options.getDialect();
- String query = dialect.getSelectFromStatement(
- options.getTableName(), rowTypeInfo.getFieldNames(), new String[0]);
+ String query = getBaseQueryStatement(rowTypeInfo);
if (readOptions.getPartitionColumnName().isPresent()) {
long lowerBound = readOptions.getPartitionLowerBound().get();
long upperBound = readOptions.getPartitionUpperBound().get();
@@ -185,6 +184,12 @@ public class JdbcTableSource implements
return builder.finish();
}
+ private String getBaseQueryStatement(RowTypeInfo rowTypeInfo) {
+ return readOptions.getQuery().orElseGet(() ->
+ options.getDialect().getSelectFromStatement(
+ options.getTableName(), rowTypeInfo.getFieldNames(), new String[0]));
+ }
+
@Override
public boolean equals(Object o) {
if (o instanceof JdbcTableSource) {
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
index bdc8642..438779f 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
@@ -57,6 +57,7 @@ import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PA
import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_LOWER_BOUND;
import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_NUM;
import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_UPPER_BOUND;
+import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_QUERY;
import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_TABLE;
import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_TYPE_VALUE_JDBC;
import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_URL;
@@ -96,6 +97,7 @@ public class JdbcTableSourceSinkFactory implements
properties.add(CONNECTOR_PASSWORD);
// scan options
+ properties.add(CONNECTOR_READ_QUERY);
properties.add(CONNECTOR_READ_PARTITION_COLUMN);
properties.add(CONNECTOR_READ_PARTITION_NUM);
properties.add(CONNECTOR_READ_PARTITION_LOWER_BOUND);
@@ -184,6 +186,7 @@ public class JdbcTableSourceSinkFactory implements
}
private JdbcReadOptions getJdbcReadOptions(DescriptorProperties descriptorProperties) {
+ final Optional<String> query = descriptorProperties.getOptionalString(CONNECTOR_READ_QUERY);
final Optional<String> partitionColumnName =
descriptorProperties.getOptionalString(CONNECTOR_READ_PARTITION_COLUMN);
final Optional<Long> partitionLower = descriptorProperties.getOptionalLong(CONNECTOR_READ_PARTITION_LOWER_BOUND);
@@ -191,6 +194,9 @@ public class JdbcTableSourceSinkFactory implements
final Optional<Integer> numPartitions = descriptorProperties.getOptionalInt(CONNECTOR_READ_PARTITION_NUM);
final JdbcReadOptions.Builder builder = JdbcReadOptions.builder();
+ if (query.isPresent()) {
+ builder.setQuery(query.get());
+ }
if (partitionColumnName.isPresent()) {
builder.setPartitionColumnName(partitionColumnName.get());
builder.setPartitionLowerBound(partitionLower.get());
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java
index e8b0fe5..218759e 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java
@@ -43,6 +43,7 @@ public class JdbcValidator extends ConnectorDescriptorValidator {
public static final String CONNECTOR_USERNAME = "connector.username";
public static final String CONNECTOR_PASSWORD = "connector.password";
+ public static final String CONNECTOR_READ_QUERY = "connector.read.query";
public static final String CONNECTOR_READ_PARTITION_COLUMN = "connector.read.partition.column";
public static final String CONNECTOR_READ_PARTITION_LOWER_BOUND = "connector.read.partition.lower-bound";
public static final String CONNECTOR_READ_PARTITION_UPPER_BOUND = "connector.read.partition.upper-bound";
@@ -89,6 +90,7 @@ public class JdbcValidator extends ConnectorDescriptorValidator {
}
private void validateReadProperties(DescriptorProperties properties) {
+ properties.validateString(CONNECTOR_READ_QUERY, true);
properties.validateString(CONNECTOR_READ_PARTITION_COLUMN, true);
properties.validateLong(CONNECTOR_READ_PARTITION_LOWER_BOUND, true);
properties.validateLong(CONNECTOR_READ_PARTITION_UPPER_BOUND, true);
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
index 74e90b2..fa8d98a 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
@@ -154,4 +154,34 @@ public class JdbcTableSourceITCase extends AbstractTestBase {
"2020-01-01T15:36:01.123456,101.1234");
StreamITCase.compareWithList(expected);
}
+
+ @Test
+ public void testScanQueryJDBCSource() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
+
+ final String testQuery = "SELECT id FROM " + INPUT_TABLE;
+ tEnv.sqlUpdate(
+ "CREATE TABLE test(" +
+ "id BIGINT" +
+ ") WITH (" +
+ " 'connector.type'='jdbc'," +
+ " 'connector.url'='" + DB_URL + "'," +
+ " 'connector.table'='whatever'," +
+ " 'connector.read.query'='" + testQuery + "'" +
+ ")"
+ );
+
+ StreamITCase.clear();
+ tEnv.toAppendStream(tEnv.sqlQuery("SELECT id FROM test"), Row.class)
+ .addSink(new StreamITCase.StringSink<>());
+ env.execute();
+
+ List<String> expected = Arrays.asList("1", "2");
+ StreamITCase.compareWithList(expected);
+ }
}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java
index 38cb29c..7f15565 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java
@@ -88,6 +88,7 @@ public class JdbcTableSourceSinkFactoryTest {
@Test
public void testJdbcReadProperties() {
Map<String, String> properties = getBasicProperties();
+ properties.put("connector.read.query", "SELECT aaa FROM mytable");
properties.put("connector.read.partition.column", "aaa");
properties.put("connector.read.partition.lower-bound", "-10");
properties.put("connector.read.partition.upper-bound", "100");
@@ -102,6 +103,7 @@ public class JdbcTableSourceSinkFactoryTest {
.setTableName("mytable")
.build();
final JdbcReadOptions readOptions = JdbcReadOptions.builder()
+ .setQuery("SELECT aaa FROM mytable")
.setPartitionColumnName("aaa")
.setPartitionLowerBound(-10)
.setPartitionUpperBound(100)