You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/22 07:18:00 UTC

[GitHub] [flink] leonardBang commented on a change in pull request #13800: [FLINK-19650][connectors jdbc]Support the limit push down for the Jdb…

leonardBang commented on a change in pull request #13800:
URL: https://github.com/apache/flink/pull/13800#discussion_r547108176



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
##########
@@ -244,6 +244,7 @@ object FlinkStreamRuleSets {
     PushProjectIntoLegacyTableSourceScanRule.INSTANCE,
     PushFilterIntoTableSourceScanRule.INSTANCE,
     PushFilterIntoLegacyTableSourceScanRule.INSTANCE,
+    PushLimitIntoTableSourceScanRule.INSTANCE,

Review comment:
       I think this rule should have been existed after support limit pushdown in planner, why we add it util now?

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -148,14 +150,15 @@ default String getDeleteStatement(String tableName, String[] conditionFields) {
 	/**
 	 * Get select fields statement by condition fields. Default use SELECT.
 	 */
-	default String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
+	default String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields, long limit) {
 		String selectExpressions = Arrays.stream(selectFields)
 				.map(this::quoteIdentifier)
 				.collect(Collectors.joining(", "));
 		String fieldExpressions = Arrays.stream(conditionFields)
 				.map(f -> format("%s = :%s", quoteIdentifier(f), f))
 				.collect(Collectors.joining(" AND "));
 		return "SELECT " + selectExpressions + " FROM " +
-				quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
+				quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "") +
+				(limit >= 0 ? " " + getLimit(limit) : "");

Review comment:
       move `limit >= 0 ? " " + getLimit(limit) : ""` to  `getLimit(long limit)` internal?

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
##########
@@ -156,11 +163,17 @@ public boolean equals(Object o) {
 			Objects.equals(readOptions, that.readOptions) &&
 			Objects.equals(lookupOptions, that.lookupOptions) &&
 			Objects.equals(physicalSchema, that.physicalSchema) &&
-			Objects.equals(dialectName, that.dialectName);
+			Objects.equals(dialectName, that.dialectName) &&
+			Objects.equals(limit, that.limit);
 	}
 
 	@Override
 	public int hashCode() {
 		return Objects.hash(options, readOptions, lookupOptions, physicalSchema, dialectName);

Review comment:
       please also add missed `limit ` 

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -148,14 +150,15 @@ default String getDeleteStatement(String tableName, String[] conditionFields) {
 	/**
 	 * Get select fields statement by condition fields. Default use SELECT.
 	 */
-	default String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
+	default String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields, long limit) {
 		String selectExpressions = Arrays.stream(selectFields)
 				.map(this::quoteIdentifier)
 				.collect(Collectors.joining(", "));
 		String fieldExpressions = Arrays.stream(conditionFields)
 				.map(f -> format("%s = :%s", quoteIdentifier(f), f))
 				.collect(Collectors.joining(" AND "));
 		return "SELECT " + selectExpressions + " FROM " +
-				quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
+				quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "") +
+				(limit >= 0 ? " " + getLimit(limit) : "");

Review comment:
       how about move `limit >= 0 ? " " + getLimit(limit) : ""` to  `getLimit(long limit)` internal?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org