You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "yuzelin (via GitHub)" <gi...@apache.org> on 2023/11/08 09:42:07 UTC

[PR] [flink] Support flink dynamic partition pruning [incubator-paimon]

yuzelin opened a new pull request, #2291:
URL: https://github.com/apache/incubator-paimon/pull/2291

   <!-- Please specify the module before the PR name: [core] ... or [flink] ... -->
   
   ### Purpose
   
   <!-- Linking this pull request to the issue -->
   To support DPP of batch query.
   
   <!-- What is the purpose of the change -->
   
   ### Tests
   `StaticFileStoreSplitEnumeratorTestBase`
   `BatchFileStoreITCase`
   `ContinuousFileStoreITCase`
   
   ### API and Format
   
   <!-- Does this change affect API or storage format -->
   
   ### Documentation
   
   <!-- Does this change introduce a new feature -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] Support flink dynamic partition pruning [incubator-paimon]

Posted by "yuzelin (via GitHub)" <gi...@apache.org>.
yuzelin commented on code in PR #2291:
URL: https://github.com/apache/incubator-paimon/pull/2291#discussion_r1387607049


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java:
##########
@@ -125,6 +129,22 @@ public FlinkSourceBuilder withWatermarkStrategy(
         return this;
     }
 
+    public FlinkSourceBuilder withDynamicPartitionFilteringFields(
+            List<String> dynamicPartitionFilteringFields) {
+        if (dynamicPartitionFilteringFields != null && !dynamicPartitionFilteringFields.isEmpty()) {
+            checkState(
+                    table instanceof AbstractFileStoreTable,
+                    "Only Paimon AbstractFileStoreTable supports dynamic filtering but get %s.",
+                    table.getClass().getName());
+
+            this.dynamicPartitionFilteringInfo =
+                    new DynamicPartitionFilteringInfo(
+                            ((AbstractFileStoreTable) table).logicPartitionType(),
+                            dynamicPartitionFilteringFields);

Review Comment:
   My mistake. I'll delete it.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java:
##########
@@ -125,6 +129,22 @@ public FlinkSourceBuilder withWatermarkStrategy(
         return this;
     }
 
+    public FlinkSourceBuilder withDynamicPartitionFilteringFields(
+            List<String> dynamicPartitionFilteringFields) {
+        if (dynamicPartitionFilteringFields != null && !dynamicPartitionFilteringFields.isEmpty()) {
+            checkState(
+                    table instanceof AbstractFileStoreTable,
+                    "Only Paimon AbstractFileStoreTable supports dynamic filtering but get %s.",
+                    table.getClass().getName());
+
+            this.dynamicPartitionFilteringInfo =
+                    new DynamicPartitionFilteringInfo(
+                            ((AbstractFileStoreTable) table).logicPartitionType(),
+                            dynamicPartitionFilteringFields);

Review Comment:
   My mistake. I'll delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] Support flink dynamic partition pruning [incubator-paimon]

Posted by "tsreaper (via GitHub)" <gi...@apache.org>.
tsreaper commented on code in PR #2291:
URL: https://github.com/apache/incubator-paimon/pull/2291#discussion_r1387586217


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java:
##########
@@ -95,4 +117,35 @@ public void close() {
     public Snapshot snapshot() {
         return snapshot;
     }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof DynamicFilteringEvent) {
+            DynamicFilteringData dynamicFilteringData =
+                    ((DynamicFilteringEvent) sourceEvent).getData();
+            LOG.warn(

Review Comment:
   `LOG.info` is enough.



##########
paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java:
##########
@@ -509,4 +510,8 @@ private RollbackHelper rollbackHelper() {
                 store().newSnapshotDeletion(),
                 store().newTagDeletion());
     }
+
+    public RowType logicPartitionType() {
+        return tableSchema.logicalPartitionType();
+    }

Review Comment:
   We can get `schema()` directly from a `FileStoreTable`. Why introducing this new method?



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java:
##########
@@ -125,6 +129,22 @@ public FlinkSourceBuilder withWatermarkStrategy(
         return this;
     }
 
+    public FlinkSourceBuilder withDynamicPartitionFilteringFields(
+            List<String> dynamicPartitionFilteringFields) {
+        if (dynamicPartitionFilteringFields != null && !dynamicPartitionFilteringFields.isEmpty()) {
+            checkState(
+                    table instanceof AbstractFileStoreTable,
+                    "Only Paimon AbstractFileStoreTable supports dynamic filtering but get %s.",
+                    table.getClass().getName());
+
+            this.dynamicPartitionFilteringInfo =
+                    new DynamicPartitionFilteringInfo(
+                            ((AbstractFileStoreTable) table).logicPartitionType(),
+                            dynamicPartitionFilteringFields);

Review Comment:
   ```suggestion
               checkState(
                       table instanceof FileStoreTable,
                       "Only Paimon FileStoreTable supports dynamic filtering but get %s.",
                       table.getClass().getName());
   
               this.dynamicPartitionFilteringInfo =
                       new DynamicPartitionFilteringInfo(
                               ((FileStoreTable) table).schema().logicPartitionType(),
                               dynamicPartitionFilteringFields);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] Support flink dynamic partition pruning [incubator-paimon]

Posted by "yuzelin (via GitHub)" <gi...@apache.org>.
yuzelin commented on code in PR #2291:
URL: https://github.com/apache/incubator-paimon/pull/2291#discussion_r1387607341


##########
paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java:
##########
@@ -509,4 +510,8 @@ private RollbackHelper rollbackHelper() {
                 store().newSnapshotDeletion(),
                 store().newTagDeletion());
     }
+
+    public RowType logicPartitionType() {
+        return tableSchema.logicalPartitionType();
+    }

Review Comment:
   My mistake. I'll delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [flink] Support flink dynamic partition pruning [incubator-paimon]

Posted by "yuzelin (via GitHub)" <gi...@apache.org>.
yuzelin merged PR #2291:
URL: https://github.com/apache/incubator-paimon/pull/2291


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org