You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/05/19 17:50:35 UTC

[flink] 01/02: [FLINK-17361] Add custom query on JDBC tables

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c34a4f288deb7dd349c6e52f674d7fa95aa013a1
Author: Flavio Pompermaier <f....@gmail.com>
AuthorDate: Mon May 4 18:15:38 2020 +0200

    [FLINK-17361] Add custom query on JDBC tables
---
 docs/dev/table/connect.md                          |  6 ++++-
 .../jdbc/internal/options/JdbcReadOptions.java     | 29 ++++++++++++++++-----
 .../connector/jdbc/table/JdbcTableSource.java      |  9 +++++--
 .../jdbc/table/JdbcTableSourceSinkFactory.java     |  6 +++++
 .../flink/table/descriptors/JdbcValidator.java     |  2 ++
 .../jdbc/table/JdbcTableSourceITCase.java          | 30 ++++++++++++++++++++++
 .../jdbc/table/JdbcTableSourceSinkFactoryTest.java |  2 ++
 7 files changed, 75 insertions(+), 9 deletions(-)

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 4a1e83a..ac2646c 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -1307,7 +1307,11 @@ CREATE TABLE MyUserTable (
   'connector.username' = 'name',
   'connector.password' = 'password',
   
-  -- **followings are scan options, optional, used when reading from table**
+  -- **followings are scan options, optional, used when reading from a table**
+
+  -- optional: SQL query / prepared statement.
+  -- If set, this will take precedence over the 'connector.table' setting
+  'connector.read.query' = 'SELECT * FROM sometable',
 
   -- These options must all be specified if any of them is specified. In addition,
   -- partition.num must be specified. They describe how to partition the table when
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
index a1350ab..65b5729 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
@@ -27,6 +27,7 @@ import java.util.Optional;
  */
 public class JdbcReadOptions implements Serializable {
 
+	private final String query;
 	private final String partitionColumnName;
 	private final Long partitionLowerBound;
 	private final Long partitionUpperBound;
@@ -35,11 +36,13 @@ public class JdbcReadOptions implements Serializable {
 	private final int fetchSize;
 
 	private JdbcReadOptions(
+			String query,
 			String partitionColumnName,
 			Long partitionLowerBound,
 			Long partitionUpperBound,
 			Integer numPartitions,
 			int fetchSize) {
+		this.query = query;
 		this.partitionColumnName = partitionColumnName;
 		this.partitionLowerBound = partitionLowerBound;
 		this.partitionUpperBound = partitionUpperBound;
@@ -48,6 +51,10 @@ public class JdbcReadOptions implements Serializable {
 		this.fetchSize = fetchSize;
 	}
 
+	public Optional<String> getQuery() {
+		return Optional.ofNullable(query);
+	}
+
 	public Optional<String> getPartitionColumnName() {
 		return Optional.ofNullable(partitionColumnName);
 	}
@@ -76,11 +83,12 @@ public class JdbcReadOptions implements Serializable {
 	public boolean equals(Object o) {
 		if (o instanceof JdbcReadOptions) {
 			JdbcReadOptions options = (JdbcReadOptions) o;
-			return Objects.equals(partitionColumnName, options.partitionColumnName) &&
-				Objects.equals(partitionLowerBound, options.partitionLowerBound) &&
-				Objects.equals(partitionUpperBound, options.partitionUpperBound) &&
-				Objects.equals(numPartitions, options.numPartitions) &&
-				Objects.equals(fetchSize, options.fetchSize);
+			return Objects.equals(query, options.query) &&
+					Objects.equals(partitionColumnName, options.partitionColumnName) &&
+					Objects.equals(partitionLowerBound, options.partitionLowerBound) &&
+					Objects.equals(partitionUpperBound, options.partitionUpperBound) &&
+					Objects.equals(numPartitions, options.numPartitions) &&
+					Objects.equals(fetchSize, options.fetchSize);
 		} else {
 			return false;
 		}
@@ -90,6 +98,7 @@ public class JdbcReadOptions implements Serializable {
 	 * Builder of {@link JdbcReadOptions}.
 	 */
 	public static class Builder {
+		protected String query;
 		protected String partitionColumnName;
 		protected Long partitionLowerBound;
 		protected Long partitionUpperBound;
@@ -98,6 +107,14 @@ public class JdbcReadOptions implements Serializable {
 		protected int fetchSize = 0;
 
 		/**
+		 * optional, SQL query statement for this JDBC source.
+		 */
+		public Builder setQuery(String query) {
+			this.query = query;
+			return this;
+		}
+
+		/**
 		 * optional, name of the column used for partitioning the input.
 		 */
 		public Builder setPartitionColumnName(String partitionColumnName) {
@@ -140,7 +157,7 @@ public class JdbcReadOptions implements Serializable {
 
 		public JdbcReadOptions build() {
 			return new JdbcReadOptions(
-				partitionColumnName, partitionLowerBound, partitionUpperBound, numPartitions, fetchSize);
+				query, partitionColumnName, partitionLowerBound, partitionUpperBound, numPartitions, fetchSize);
 		}
 	}
 }
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java
index a599f2f..ff21aae 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java
@@ -168,8 +168,7 @@ public class JdbcTableSource implements
 		}
 
 		final JdbcDialect dialect = options.getDialect();
-		String query = dialect.getSelectFromStatement(
-			options.getTableName(), rowTypeInfo.getFieldNames(), new String[0]);
+		String query = getBaseQueryStatement(rowTypeInfo);
 		if (readOptions.getPartitionColumnName().isPresent()) {
 			long lowerBound = readOptions.getPartitionLowerBound().get();
 			long upperBound = readOptions.getPartitionUpperBound().get();
@@ -185,6 +184,12 @@ public class JdbcTableSource implements
 		return builder.finish();
 	}
 
+	private String getBaseQueryStatement(RowTypeInfo rowTypeInfo) {
+		return readOptions.getQuery().orElseGet(() ->
+			options.getDialect().getSelectFromStatement(
+				options.getTableName(), rowTypeInfo.getFieldNames(), new String[0]));
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (o instanceof JdbcTableSource) {
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
index bdc8642..438779f 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
@@ -57,6 +57,7 @@ import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PA
 import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_LOWER_BOUND;
 import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_NUM;
 import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_UPPER_BOUND;
+import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_QUERY;
 import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_TABLE;
 import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_TYPE_VALUE_JDBC;
 import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_URL;
@@ -96,6 +97,7 @@ public class JdbcTableSourceSinkFactory implements
 		properties.add(CONNECTOR_PASSWORD);
 
 		// scan options
+		properties.add(CONNECTOR_READ_QUERY);
 		properties.add(CONNECTOR_READ_PARTITION_COLUMN);
 		properties.add(CONNECTOR_READ_PARTITION_NUM);
 		properties.add(CONNECTOR_READ_PARTITION_LOWER_BOUND);
@@ -184,6 +186,7 @@ public class JdbcTableSourceSinkFactory implements
 	}
 
 	private JdbcReadOptions getJdbcReadOptions(DescriptorProperties descriptorProperties) {
+		final Optional<String> query = descriptorProperties.getOptionalString(CONNECTOR_READ_QUERY);
 		final Optional<String> partitionColumnName =
 			descriptorProperties.getOptionalString(CONNECTOR_READ_PARTITION_COLUMN);
 		final Optional<Long> partitionLower = descriptorProperties.getOptionalLong(CONNECTOR_READ_PARTITION_LOWER_BOUND);
@@ -191,6 +194,9 @@ public class JdbcTableSourceSinkFactory implements
 		final Optional<Integer> numPartitions = descriptorProperties.getOptionalInt(CONNECTOR_READ_PARTITION_NUM);
 
 		final JdbcReadOptions.Builder builder = JdbcReadOptions.builder();
+		if (query.isPresent()) {
+			builder.setQuery(query.get());
+		}
 		if (partitionColumnName.isPresent()) {
 			builder.setPartitionColumnName(partitionColumnName.get());
 			builder.setPartitionLowerBound(partitionLower.get());
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java
index e8b0fe5..218759e 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java
@@ -43,6 +43,7 @@ public class JdbcValidator extends ConnectorDescriptorValidator {
 	public static final String CONNECTOR_USERNAME = "connector.username";
 	public static final String CONNECTOR_PASSWORD = "connector.password";
 
+	public static final String CONNECTOR_READ_QUERY = "connector.read.query";
 	public static final String CONNECTOR_READ_PARTITION_COLUMN = "connector.read.partition.column";
 	public static final String CONNECTOR_READ_PARTITION_LOWER_BOUND = "connector.read.partition.lower-bound";
 	public static final String CONNECTOR_READ_PARTITION_UPPER_BOUND = "connector.read.partition.upper-bound";
@@ -89,6 +90,7 @@ public class JdbcValidator extends ConnectorDescriptorValidator {
 	}
 
 	private void validateReadProperties(DescriptorProperties properties) {
+		properties.validateString(CONNECTOR_READ_QUERY, true);
 		properties.validateString(CONNECTOR_READ_PARTITION_COLUMN, true);
 		properties.validateLong(CONNECTOR_READ_PARTITION_LOWER_BOUND, true);
 		properties.validateLong(CONNECTOR_READ_PARTITION_UPPER_BOUND, true);
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
index 74e90b2..fa8d98a 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
@@ -154,4 +154,34 @@ public class JdbcTableSourceITCase extends AbstractTestBase {
 				"2020-01-01T15:36:01.123456,101.1234");
 		StreamITCase.compareWithList(expected);
 	}
+
+	@Test
+	public void testScanQueryJDBCSource() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
+			.useBlinkPlanner()
+			.inStreamingMode()
+			.build();
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
+
+		final String testQuery = "SELECT id FROM " + INPUT_TABLE;
+		tEnv.sqlUpdate(
+			"CREATE TABLE test(" +
+				"id BIGINT" +
+				") WITH (" +
+				"  'connector.type'='jdbc'," +
+				"  'connector.url'='" + DB_URL + "'," +
+				"  'connector.table'='whatever'," +
+				"  'connector.read.query'='" + testQuery + "'" +
+				")"
+		);
+
+		StreamITCase.clear();
+		tEnv.toAppendStream(tEnv.sqlQuery("SELECT id FROM test"), Row.class)
+			.addSink(new StreamITCase.StringSink<>());
+		env.execute();
+
+		List<String> expected =	Arrays.asList("1", "2");
+		StreamITCase.compareWithList(expected);
+	}
 }
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java
index 38cb29c..7f15565 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java
@@ -88,6 +88,7 @@ public class JdbcTableSourceSinkFactoryTest {
 	@Test
 	public void testJdbcReadProperties() {
 		Map<String, String> properties = getBasicProperties();
+		properties.put("connector.read.query", "SELECT aaa FROM mytable");
 		properties.put("connector.read.partition.column", "aaa");
 		properties.put("connector.read.partition.lower-bound", "-10");
 		properties.put("connector.read.partition.upper-bound", "100");
@@ -102,6 +103,7 @@ public class JdbcTableSourceSinkFactoryTest {
 			.setTableName("mytable")
 			.build();
 		final JdbcReadOptions readOptions = JdbcReadOptions.builder()
+			.setQuery("SELECT aaa FROM mytable")
 			.setPartitionColumnName("aaa")
 			.setPartitionLowerBound(-10)
 			.setPartitionUpperBound(100)