You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/27 07:18:38 UTC

[GitHub] [flink] wuchong commented on a change in pull request #13800: [FLINK-19650][connectors jdbc]Support the limit push down for the Jdb…

wuchong commented on a change in pull request #13800:
URL: https://github.com/apache/flink/pull/13800#discussion_r512459822



##########
File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
##########
@@ -154,7 +155,7 @@ public void testProject() throws Exception {
 				")"
 		);
 
-		Iterator<Row> collected = tEnv.executeSql("SELECT id,timestamp6_col,decimal_col FROM " + INPUT_TABLE).collect();
+		Iterator<Row> collected = tEnv.executeSql("SELECT id,timestamp6_col,decimal_col FROM " + INPUT_TABLE + " LIMIT 1").collect();

Review comment:
       This doesn't break the test?

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
##########
@@ -92,8 +99,14 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
 			.setPassword(options.getPassword().orElse(null))
 			.setAutoCommit(readOptions.getAutoCommit());
 
-		if (readOptions.getFetchSize() != 0) {
-			builder.setFetchSize(readOptions.getFetchSize());
+		if (readOptions.getFetchSize() != 0 || this.limit >= 0) {
+			int fetchsize = readOptions.getFetchSize();
+			if (fetchsize == 0) {
+				fetchsize = limit;
+			} else if (limit != -1) {
+				fetchsize = Integer.min(fetchsize, limit);
+			}
+			builder.setFetchSize(fetchsize);

Review comment:
       We don't need to adapt fetch size, it is a best effort read size. 

##########
File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
##########
@@ -166,4 +167,38 @@ public void testProject() throws Exception {
 				.sorted().collect(Collectors.toList());
 		assertEquals(expected, result);
 	}
+
+	@Test
+	public void testLimit() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
+				.useBlinkPlanner()
+				.inStreamingMode()
+				.build();
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
+
+		tEnv.executeSql(
+				"CREATE TABLE " + INPUT_TABLE + "(" +
+						"id BIGINT," +
+						"timestamp6_col TIMESTAMP(6)," +
+						"timestamp9_col TIMESTAMP(9)," +
+						"time_col TIME," +
+						"real_col FLOAT," +
+						"double_col DOUBLE," +
+						"decimal_col DECIMAL(10, 4)" +
+						") WITH (" +
+						"  'connector'='jdbc'," +
+						"  'url'='" + DB_URL + "'," +
+						"  'table-name'='" + INPUT_TABLE + "'" +
+						")"
+		);
+
+		Iterator<Row> collected = tEnv.executeSql("SELECT id FROM " + INPUT_TABLE + " LIMIT 1").collect();

Review comment:
       Please select all the fields. 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
##########
@@ -156,11 +169,25 @@ public boolean equals(Object o) {
 			Objects.equals(readOptions, that.readOptions) &&
 			Objects.equals(lookupOptions, that.lookupOptions) &&
 			Objects.equals(physicalSchema, that.physicalSchema) &&
-			Objects.equals(dialectName, that.dialectName);
+			Objects.equals(dialectName, that.dialectName) &&
+			Objects.equals(limit, that.limit);
 	}
 
 	@Override
 	public int hashCode() {
 		return Objects.hash(options, readOptions, lookupOptions, physicalSchema, dialectName);
 	}
+
+	/**
+	 * {@link java.sql.Statement#setFetchSize(int)} only accepts int value.
+	 */
+	@Override
+	public void applyLimit(long limit) {
+		if (limit > Integer.MAX_VALUE) {
+			throw new TableException(
+					String.format("The maximum limit value is %d for jdbc connector. Get %d.",
+							Integer.MAX_VALUE, limit));
+		}
+		this.limit = Math.toIntExact(limit);

Review comment:
       We don't need to cast to int. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org