You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "beyond1920 (via GitHub)" <gi...@apache.org> on 2023/03/06 10:27:46 UTC

[GitHub] [hudi] beyond1920 opened a new pull request, #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

beyond1920 opened a new pull request, #8102:
URL: https://github.com/apache/hudi/pull/8102

   ### Change Logs
   
   The pr  aims to support partition pruning for flink streaming source in runtime
   
   
   ### Impact
   
   Currently, partition pruning only works in the compile phase, the new created partition could not be pruned in runtime.
   This pr aims to solve this.
   
   ### Risk level (write none, low medium or high below)
   
   NA
   
   ### Documentation Update
   
   NA
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] BruceKellan commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "BruceKellan (via GitHub)" <gi...@apache.org>.
BruceKellan commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1129235765


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -334,6 +334,12 @@ private FlinkOptions() {
       .withDescription("Enables data-skipping allowing queries to leverage indexes to reduce the search space by"
           + "skipping over files");
 
+  public static final ConfigOption<Boolean> READ_CONTINUOUS_PARTITION_PRUNING_ENABLED = ConfigOptions
+      .key("read.continuous.partition-pruning.enabled")
+      .booleanType()
+      .defaultValue(false)// default disable continuous partition pruning

Review Comment:
   In what scenarios do users need to disable partition pruning?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1455924660

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8958c445f7354ed113e8ff0a1aa9f56261bede34 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1456288809

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8958c445f7354ed113e8ff0a1aa9f56261bede34 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1495273281

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     }, {
       "hash" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097",
       "triggerID" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * be88d99070504f75c88bfcf48b3c078ca93a35df Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097) 
   * a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157081803


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -1892,4 +1992,66 @@ private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sin
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
+
+  private void testContinuousPartitionPrune(
+      HoodieTableType tableType,
+      boolean hiveStylePartitioning,
+      String filterCondition,
+      List<RowData> results
+  ) throws Exception {
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.TABLE_NAME, "t1");
+    conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
+    conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
+
+    // write one commit
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.READ_AS_STREAMING, true)
+        .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
+        .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+
+    String sinkDDL = "create table sink(\n"
+        + "  uuid varchar(20),\n"
+        + "  name varchar(20),\n"
+        + "  age int,\n"
+        + "  ts timestamp,\n"
+        + "  part varchar(20)"
+        + ") with (\n"
+        + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+        + ")";
+    TableResult tableResult = submitSelectSql(
+        streamTableEnv,
+        "select uuid, name, age, ts, `partition` as part from t1 where " + filterCondition,
+        sinkDDL);
+
+    // write second commit
+    TestData.writeData(DATA_SET_NEW_PARTITIONS, conf);
+    // stop the streaming query and fetch the result
+    List<Row> result = stopAndFetchData(streamTableEnv, tableResult, 10);
+    assertRowsEquals(result, results);
+  }
+
+  private TableResult submitSelectSql(TableEnvironment tEnv, String select, String sinkDDL) {
+    tEnv.executeSql("DROP TABLE IF EXISTS sink");
+    tEnv.executeSql(sinkDDL);
+    TableResult tableResult = tEnv.executeSql("insert into sink " + select);
+    return tableResult;
+  }
+
+  private List<Row> stopAndFetchData(TableEnvironment tEnv, TableResult tableResult, long timeout)
+      throws InterruptedException {
+    // wait for the timeout then cancels the job

Review Comment:
   There is no need to  hold on, just add enough num of partitions then start the steaming read pipeline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] beyond1920 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "beyond1920 (via GitHub)" <gi...@apache.org>.
beyond1920 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157240692


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.prune;
+
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.util.DataTypeUtils;
+
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Tools to prune partitions.
+ */
+public class PartitionPruners {
+
+  public interface PartitionPruner extends Serializable {
+
+    /**
+     * Applies partition pruning on the given partition list, return remained partitions.
+     */
+    Set<String> filter(Collection<String> partitions);
+  }
+
+  /**
+   * Dynamic partition pruner for hoodie table source which partitions list is available in runtime phase. Note: the data of new partitions created after the job starts could be read if they match the
+   * filter conditions.
+   */
+  public static class DynamicPartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<ExpressionEvaluators.Evaluator> partitionEvaluator;
+
+    private final String[] partitionKeys;
+
+    private final List<DataType> partitionTypes;
+
+    private final String defaultParName;
+
+    private final boolean hivePartition;
+
+    private DynamicPartitionPruner(
+        List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+        List<String> partitionKeys,
+        List<DataType> partitionTypes,
+        String defaultParName,
+        boolean hivePartition) {
+      this.partitionEvaluator = partitionEvaluators;
+      this.partitionKeys = partitionKeys.toArray(new String[] {});
+      this.partitionTypes = partitionTypes;
+      this.defaultParName = defaultParName;
+      this.hivePartition = hivePartition;
+    }
+
+    public Set<String> filter(Collection<String> partitions) {
+      return partitions.stream().filter(this::evaluate).collect(Collectors.toSet());
+    }
+
+    private boolean evaluate(String partition) {
+      String[] partStrArray = FilePathUtils.extractPartitionKeyValues(
+          new org.apache.hadoop.fs.Path(partition),
+          hivePartition,
+          partitionKeys).values().toArray(new String[] {});
+      Map<String, ColumnStats> partStats = new LinkedHashMap<>();
+      for (int idx = 0; idx < partitionKeys.length; idx++) {
+        String partKey = partitionKeys[idx];
+        Object partVal = partKey.equals(defaultParName)
+            ? null : DataTypeUtils.resolvePartition(partStrArray[idx], partitionTypes.get(idx));
+        ColumnStats columnStats = new ColumnStats(partVal, partVal, partVal == null ? 1 : 0);
+        partStats.put(partKey, columnStats);
+      }
+      return partitionEvaluator.stream().allMatch(evaluator -> evaluator.eval(partStats));
+    }
+  }
+
+  /**
+   * Static partition pruner for hoodie table source which partitions list is available in compile phase.
+   * After applied this partition pruner, hoodie source could not read the data from other partitions during runtime.
+   * Note: the data of new partitions created after the job starts would never be read.
+   */
+  public static class StaticPartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Set<String> candidatePartitions;
+
+    private StaticPartitionPruner(Set<String> candidatePartitions) {
+      this.candidatePartitions = candidatePartitions;
+    }
+
+    public Set<String> filter(Collection<String> partitions) {
+      return partitions.stream()
+          .filter(this.candidatePartitions::contains).collect(Collectors.toSet());
+    }
+  }
+
+  /**
+   * Fake partition pruner which would not prune any partitions.
+   */
+  public static class FakePartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private FakePartitionPruner() {
+
+    }
+
+    @Override
+    public Set<String> filter(Collection<String> partitions) {
+      return new HashSet<>(partitions);
+    }
+  }
+
+  public static PartitionPruner getInstance(
+      List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+      List<String> partitionKeys,
+      List<DataType> partitionTypes,
+      String defaultParName,
+      boolean hivePartition) {
+    if (partitionEvaluators.isEmpty()) {
+      return new FakePartitionPruner();
+    } else {
+      return new DynamicPartitionPruner(partitionEvaluators, partitionKeys, partitionTypes, defaultParName, hivePartition);
+    }
+  }
+
+  public static PartitionPruner getInstance(Collection<String> candidatePartitions) {
+    Set<String> distinctPartitions = new HashSet<>(candidatePartitions);
+    if (distinctPartitions.isEmpty()) {

Review Comment:
   Jus to to keep the old case `TestInputFormat#testReadWithChangeLogCOW` works.
   Besides, `FakePartitionPruner ` also works when there is no simple partition related conditions in the whole filter condition. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1498397467

   The failed test case: `ITTestHoodieDataSource#testWriteAndReadWithDataSkipping` is failing continuously in other CI tests, should not be introduced by this patch, would merge it soon ~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] BruceKellan commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "BruceKellan (via GitHub)" <gi...@apache.org>.
BruceKellan commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1129235765


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -334,6 +334,12 @@ private FlinkOptions() {
       .withDescription("Enables data-skipping allowing queries to leverage indexes to reduce the search space by"
           + "skipping over files");
 
+  public static final ConfigOption<Boolean> READ_CONTINUOUS_PARTITION_PRUNING_ENABLED = ConfigOptions
+      .key("read.continuous.partition-pruning.enabled")
+      .booleanType()
+      .defaultValue(false)// default disable continuous partition pruning

Review Comment:
   In what scenarios do users need to disable partition pruning?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1465065937

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f500063efc2baefad6c6c53e77f25b730b0c0346 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675) 
   * 5b46e06d81e5680024cca39a94f00b2f18c179e0 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] beyond1920 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "beyond1920 (via GitHub)" <gi...@apache.org>.
beyond1920 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1148387029


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/StaticPartitionPruner.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.prune;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Static partition pruner for hoodie table source which partitions list is available in compile phase.
+ * After applied this partition pruner, hoodie source could not read the data from other partitions during runtime.
+ * Note: the data of new partitions created after the job starts would never be read.
+ */
+public class StaticPartitionPruner implements PartitionPruner {
+
+  private static final long serialVersionUID = 1L;

Review Comment:
   In fact, `DynamicPartitionPruner` and `StaticPartitionPruner` could not happen at the same time.
   `StaticPartitionPruner` could only work for `HoodieTableSource` because Flink optimizer pushes down satisfied partitions instead of partition pruning filter conditions.
   `DynamicPartitionPruner` could only work for `HoodieContinuousPartitionTableSource` because Flink optimizer pushes down partition pruning conditions instead of  satisfied partitions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1483885437

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 07e8d65356ba30c13740c585d737104f30d674a5 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157170289


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.prune;
+
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.util.DataTypeUtils;
+
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Tools to prune partitions.
+ */
+public class PartitionPruners {
+
+  public interface PartitionPruner extends Serializable {
+
+    /**
+     * Applies partition pruning on the given partition list, return remained partitions.
+     */
+    Set<String> filter(Collection<String> partitions);
+  }
+
+  /**
+   * Dynamic partition pruner for hoodie table source which partitions list is available in runtime phase. Note: the data of new partitions created after the job starts could be read if they match the
+   * filter conditions.
+   */
+  public static class DynamicPartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<ExpressionEvaluators.Evaluator> partitionEvaluator;
+
+    private final String[] partitionKeys;
+
+    private final List<DataType> partitionTypes;
+
+    private final String defaultParName;
+
+    private final boolean hivePartition;
+
+    private DynamicPartitionPruner(
+        List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+        List<String> partitionKeys,
+        List<DataType> partitionTypes,
+        String defaultParName,
+        boolean hivePartition) {
+      this.partitionEvaluator = partitionEvaluators;
+      this.partitionKeys = partitionKeys.toArray(new String[] {});
+      this.partitionTypes = partitionTypes;
+      this.defaultParName = defaultParName;
+      this.hivePartition = hivePartition;
+    }
+
+    public Set<String> filter(Collection<String> partitions) {
+      return partitions.stream().filter(this::evaluate).collect(Collectors.toSet());
+    }
+
+    private boolean evaluate(String partition) {
+      String[] partStrArray = FilePathUtils.extractPartitionKeyValues(
+          new org.apache.hadoop.fs.Path(partition),
+          hivePartition,
+          partitionKeys).values().toArray(new String[] {});
+      Map<String, ColumnStats> partStats = new LinkedHashMap<>();
+      for (int idx = 0; idx < partitionKeys.length; idx++) {
+        String partKey = partitionKeys[idx];
+        Object partVal = partKey.equals(defaultParName)
+            ? null : DataTypeUtils.resolvePartition(partStrArray[idx], partitionTypes.get(idx));
+        ColumnStats columnStats = new ColumnStats(partVal, partVal, partVal == null ? 1 : 0);
+        partStats.put(partKey, columnStats);
+      }
+      return partitionEvaluator.stream().allMatch(evaluator -> evaluator.eval(partStats));
+    }
+  }
+
+  /**
+   * Static partition pruner for hoodie table source which partitions list is available in compile phase.
+   * After applied this partition pruner, hoodie source could not read the data from other partitions during runtime.
+   * Note: the data of new partitions created after the job starts would never be read.
+   */
+  public static class StaticPartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Set<String> candidatePartitions;
+
+    private StaticPartitionPruner(Set<String> candidatePartitions) {
+      this.candidatePartitions = candidatePartitions;
+    }
+
+    public Set<String> filter(Collection<String> partitions) {
+      return partitions.stream()
+          .filter(this.candidatePartitions::contains).collect(Collectors.toSet());
+    }
+  }
+
+  /**
+   * Fake partition pruner which would not prune any partitions.
+   */
+  public static class FakePartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private FakePartitionPruner() {
+
+    }
+
+    @Override
+    public Set<String> filter(Collection<String> partitions) {
+      return new HashSet<>(partitions);
+    }
+  }
+
+  public static PartitionPruner getInstance(
+      List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+      List<String> partitionKeys,
+      List<DataType> partitionTypes,
+      String defaultParName,
+      boolean hivePartition) {
+    if (partitionEvaluators.isEmpty()) {
+      return new FakePartitionPruner();

Review Comment:
   In which case the `partitionEvaluators` can be empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] BruceKellan commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "BruceKellan (via GitHub)" <gi...@apache.org>.
BruceKellan commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1129234266


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -334,6 +334,12 @@ private FlinkOptions() {
       .withDescription("Enables data-skipping allowing queries to leverage indexes to reduce the search space by"
           + "skipping over files");
 
+  public static final ConfigOption<Boolean> READ_CONTINUOUS_PARTITION_PRUNING_ENABLED = ConfigOptions
+      .key("read.continuous.partition-pruning.enabled")

Review Comment:
   Can we follow the existing specification and change this configuration to `read.streaming.....`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] beyond1920 commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "beyond1920 (via GitHub)" <gi...@apache.org>.
beyond1920 commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1486353684

   @danny0405 Thanks for suggestion.
   The pr actually already use the second way as you suggest. Please see the `HoodieContinuousPartitionTableSource `.
   I just keep the `HoodieTableSource ` at the same time to provide a way fallback to the original behavior if the query doesn't work well using new `HoodieContinuousPartitionTableSource `.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] flashJd commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "flashJd (via GitHub)" <gi...@apache.org>.
flashJd commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1203545727


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java:
##########
@@ -177,4 +183,85 @@ public static Object getValueFromLiteral(ValueLiteralExpression expr) {
         throw new UnsupportedOperationException("Unsupported type: " + logicalType);
     }
   }
+
+  public static List<ResolvedExpression> filterSimpleCallExpression(List<ResolvedExpression> exprs) {
+    return exprs.stream()
+        .filter(ExpressionUtils::isSimpleCallExpression)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Extracts partition predicate from filter condition.
+   *
+   * <p>NOTE: the {@code expressions} should be simple call expressions.
+   *
+   * @return A tuple of partition predicates and non-partition predicates.
+   */
+  public static Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> splitExprByPartitionCall(
+      List<ResolvedExpression> expressions,
+      List<String> partitionKeys,
+      RowType tableRowType) {
+    if (partitionKeys.isEmpty()) {
+      return Tuple2.of(expressions, Collections.emptyList());
+    } else {
+      List<ResolvedExpression> partitionFilters = new ArrayList<>();
+      List<ResolvedExpression> nonPartitionFilters = new ArrayList<>();
+      final List<String> fieldNames = tableRowType.getFieldNames();
+      Set<Integer> parFieldPos = partitionKeys.stream().map(fieldNames::indexOf).collect(Collectors.toSet());
+      for (ResolvedExpression expr : expressions) {
+        for (CallExpression e : splitByAnd(expr)) {
+          if (isPartitionCallExpr(e, parFieldPos)) {
+            partitionFilters.add(expr);

Review Comment:
   should it be partitionFilters.add(e)?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java:
##########
@@ -177,4 +183,85 @@ public static Object getValueFromLiteral(ValueLiteralExpression expr) {
         throw new UnsupportedOperationException("Unsupported type: " + logicalType);
     }
   }
+
+  public static List<ResolvedExpression> filterSimpleCallExpression(List<ResolvedExpression> exprs) {
+    return exprs.stream()
+        .filter(ExpressionUtils::isSimpleCallExpression)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Extracts partition predicate from filter condition.
+   *
+   * <p>NOTE: the {@code expressions} should be simple call expressions.
+   *
+   * @return A tuple of partition predicates and non-partition predicates.
+   */
+  public static Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> splitExprByPartitionCall(
+      List<ResolvedExpression> expressions,
+      List<String> partitionKeys,
+      RowType tableRowType) {
+    if (partitionKeys.isEmpty()) {
+      return Tuple2.of(expressions, Collections.emptyList());
+    } else {
+      List<ResolvedExpression> partitionFilters = new ArrayList<>();
+      List<ResolvedExpression> nonPartitionFilters = new ArrayList<>();
+      final List<String> fieldNames = tableRowType.getFieldNames();
+      Set<Integer> parFieldPos = partitionKeys.stream().map(fieldNames::indexOf).collect(Collectors.toSet());
+      for (ResolvedExpression expr : expressions) {
+        for (CallExpression e : splitByAnd(expr)) {

Review Comment:
   I use the predicate "where age > 5 and `partition` = 1",  flink framework split the “and”,  call applyFilters(List<ResolvedExpression> filters) with a Two-element list(greatThan(age,5),equals(partition,1);
   so I wonder in what condition we  should call splitByAnd(expr), as expr already be split by flink @beyond1920 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1464880012

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8958c445f7354ed113e8ff0a1aa9f56261bede34 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593) 
   * 9fd31f69d5ae5a349a553cb207337833958c7099 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1465088667

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e2ceb307219b4e27b276ab986c47bf77a1ec2d25 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] beyond1920 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "beyond1920 (via GitHub)" <gi...@apache.org>.
beyond1920 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1155909030


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java:
##########
@@ -177,4 +183,123 @@ public static Object getValueFromLiteral(ValueLiteralExpression expr) {
         throw new UnsupportedOperationException("Unsupported type: " + logicalType);
     }
   }
+
+  public static List<ResolvedExpression> filterSimpleCallExpression(List<ResolvedExpression> exprs) {
+    return exprs.stream()
+        .filter(ExpressionUtils::isSimpleCallExpression)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Extracts partition predicate from filter condition.
+   * Returns partition predicates and non-partition predicates.
+   */
+  public static Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> extractPartitionPredicateList(
+      List<ResolvedExpression> exprs,
+      List<String> partitionKeys,
+      RowType tableRowType) {
+    if (partitionKeys.isEmpty()) {
+      return Tuple2.of(exprs, Collections.emptyList());
+    } else {
+      List<ResolvedExpression> partitionFilters = new ArrayList<>();
+      List<ResolvedExpression> nonPartitionFilters = new ArrayList<>();
+      int[] partitionIdxMapping = tableRowType.getFieldNames().stream().mapToInt(partitionKeys::indexOf).toArray();
+      for (ResolvedExpression expr : exprs) {

Review Comment:
   Why do we need to change type of  `partitionIdxMapping`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157178360


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -1835,6 +1885,75 @@ private static Stream<Arguments> indexAndTableTypeParams() {
     return Stream.of(data).map(Arguments::of);
   }
 
+  /**
+   * Return test params => (HoodieTableType, hive style partitioning, filter condition, expected result).
+   */
+  private static Stream<Arguments> tableTypeAndFilters() {
+    HoodieTableType[] tableTypes = {HoodieTableType.COPY_ON_WRITE, HoodieTableType.MERGE_ON_READ};
+    boolean [] isHiveStylePartitions = {true, false};
+    Pair<String,  List<RowData>>[] filterAndResults = new Pair[] {
+        Pair.of(
+            "`partition` = 'par5' or `partition` = 'par6'",
+            Arrays.asList(
+                insertRow(StringData.fromString("id9"), StringData.fromString("LiLi"), 24,
+                    TimestampData.fromEpochMillis(9), StringData.fromString("par5")))),
+        Pair.of(
+        "`partition` >= 'par5' and `partition` <= 'par6'",
+        Arrays.asList(
+            insertRow(StringData.fromString("id9"), StringData.fromString("LiLi"), 24,
+                TimestampData.fromEpochMillis(9), StringData.fromString("par5")))),
+        Pair.of(
+        "`partition` <> 'par7'",
+        Arrays.asList(
+            insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+                TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+            insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33,
+                TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+            insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53,

Review Comment:
   Can the test be simplified by using one input data set?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1497729726

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     }, {
       "hash" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097",
       "triggerID" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16106",
       "triggerID" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16126",
       "triggerID" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a55fbb4f7e076f58e31ffd8b66280b9af08ceef4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16138",
       "triggerID" : "a55fbb4f7e076f58e31ffd8b66280b9af08ceef4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d549b98077efb89ce3e5edeebdb54088f26d345c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16141",
       "triggerID" : "d549b98077efb89ce3e5edeebdb54088f26d345c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d549b98077efb89ce3e5edeebdb54088f26d345c Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16141) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] beyond1920 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "beyond1920 (via GitHub)" <gi...@apache.org>.
beyond1920 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157229473


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java:
##########
@@ -177,4 +183,123 @@ public static Object getValueFromLiteral(ValueLiteralExpression expr) {
         throw new UnsupportedOperationException("Unsupported type: " + logicalType);
     }
   }
+
+  public static List<ResolvedExpression> filterSimpleCallExpression(List<ResolvedExpression> exprs) {
+    return exprs.stream()
+        .filter(ExpressionUtils::isSimpleCallExpression)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Extracts partition predicate from filter condition.
+   * Returns partition predicates and non-partition predicates.
+   */
+  public static Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> extractPartitionPredicateList(
+      List<ResolvedExpression> exprs,
+      List<String> partitionKeys,
+      RowType tableRowType) {
+    if (partitionKeys.isEmpty()) {
+      return Tuple2.of(exprs, Collections.emptyList());
+    } else {
+      List<ResolvedExpression> partitionFilters = new ArrayList<>();
+      List<ResolvedExpression> nonPartitionFilters = new ArrayList<>();
+      int[] partitionIdxMapping = tableRowType.getFieldNames().stream().mapToInt(partitionKeys::indexOf).toArray();
+      for (ResolvedExpression expr : exprs) {

Review Comment:
   I need a mapping from partition key index in original table source index to partition key in partition keys list.
   Do you mean changes to a Map?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1496022408

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     }, {
       "hash" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097",
       "triggerID" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16106",
       "triggerID" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16106) 
   * daabef5271b668f376ff2f7e82071d56c717cbbe UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1478895092

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e2ceb307219b4e27b276ab986c47bf77a1ec2d25 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678) 
   * 3ad4f77d51bc9a280161865028f89f362fcdb754 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] beyond1920 commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "beyond1920 (via GitHub)" <gi...@apache.org>.
beyond1920 commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1483842164

   @danny0405 Thanks for your review.
   I want to explain why I introduced a new `HoodieContinuousPartitionTableSource` class and a configuration to determine whether this improvement is enabled.
   Currently, Flink's partition pruning mechanism has two limitations:
   1. The partition pruning is happened in optimization phase. The runtime will not perform a subsequent partition pruning.
   2. Once partition pruning is done, Flink optimizer removes the partition-related filter conditions from the filter conditions, so it is no longer possible to access the partition-related filter conditions in the later filter pruning phase.
   So The new `TableSource` could not be a `SupportsPartitionPushDown` tableSource to ensure it could fetch all the filter conditions of the SQL query.  But currently it could not support complex filter conditions like Flink native partition pruning optimization, such as the filter conditions contains UDF and so on. It might cause some complex query statements to execute more slowly. That's the reason I want to introduce a new class.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] beyond1920 commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "beyond1920 (via GitHub)" <gi...@apache.org>.
beyond1920 commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1460039753

   Hi, @BruceKellan Thanks for your attention and review. I would fixed your comment later.
   BTW, this pr has two block prs, which are [PR-8051](https://github.com/apache/hudi/pull/8051) and [PR-8101](https://github.com/apache/hudi/pull/8101). Could you please take a look at those two prs ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] beyond1920 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "beyond1920 (via GitHub)" <gi...@apache.org>.
beyond1920 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1155943634


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -1892,4 +1992,66 @@ private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sin
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
+
+  private void testContinuousPartitionPrune(
+      HoodieTableType tableType,
+      boolean hiveStylePartitioning,
+      String filterCondition,
+      List<RowData> results
+  ) throws Exception {
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.TABLE_NAME, "t1");
+    conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
+    conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
+
+    // write one commit
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.READ_AS_STREAMING, true)
+        .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
+        .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+
+    String sinkDDL = "create table sink(\n"
+        + "  uuid varchar(20),\n"
+        + "  name varchar(20),\n"
+        + "  age int,\n"
+        + "  ts timestamp,\n"
+        + "  part varchar(20)"
+        + ") with (\n"
+        + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+        + ")";
+    TableResult tableResult = submitSelectSql(
+        streamTableEnv,
+        "select uuid, name, age, ts, `partition` as part from t1 where " + filterCondition,
+        sinkDDL);
+
+    // write second commit
+    TestData.writeData(DATA_SET_NEW_PARTITIONS, conf);
+    // stop the streaming query and fetch the result
+    List<Row> result = stopAndFetchData(streamTableEnv, tableResult, 10);
+    assertRowsEquals(result, results);
+  }
+
+  private TableResult submitSelectSql(TableEnvironment tEnv, String select, String sinkDDL) {
+    tEnv.executeSql("DROP TABLE IF EXISTS sink");
+    tEnv.executeSql(sinkDDL);
+    TableResult tableResult = tEnv.executeSql("insert into sink " + select);
+    return tableResult;
+  }
+
+  private List<Row> stopAndFetchData(TableEnvironment tEnv, TableResult tableResult, long timeout)
+      throws InterruptedException {
+    // wait for the timeout then cancels the job

Review Comment:
   No, I need a new method to submit a streaming job, hold on, waiting until I finish to add new partitions, then fetch result of previous streaming job.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1464878756

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8958c445f7354ed113e8ff0a1aa9f56261bede34 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593) 
   * 9fd31f69d5ae5a349a553cb207337833958c7099 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1455912035

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8958c445f7354ed113e8ff0a1aa9f56261bede34 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1493606617

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 07e8d65356ba30c13740c585d737104f30d674a5 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916) 
   * d2d2f59eb8ed24b0835099deb0c61d8ca993073f Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1494088279

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f763056c7fe6bdfeb44e93bf4df137da2e4afd21 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] beyond1920 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "beyond1920 (via GitHub)" <gi...@apache.org>.
beyond1920 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157240692


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.prune;
+
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.util.DataTypeUtils;
+
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Tools to prune partitions.
+ */
+public class PartitionPruners {
+
+  public interface PartitionPruner extends Serializable {
+
+    /**
+     * Applies partition pruning on the given partition list, return remained partitions.
+     */
+    Set<String> filter(Collection<String> partitions);
+  }
+
+  /**
+   * Dynamic partition pruner for hoodie table source which partitions list is available in runtime phase. Note: the data of new partitions created after the job starts could be read if they match the
+   * filter conditions.
+   */
+  public static class DynamicPartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<ExpressionEvaluators.Evaluator> partitionEvaluator;
+
+    private final String[] partitionKeys;
+
+    private final List<DataType> partitionTypes;
+
+    private final String defaultParName;
+
+    private final boolean hivePartition;
+
+    private DynamicPartitionPruner(
+        List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+        List<String> partitionKeys,
+        List<DataType> partitionTypes,
+        String defaultParName,
+        boolean hivePartition) {
+      this.partitionEvaluator = partitionEvaluators;
+      this.partitionKeys = partitionKeys.toArray(new String[] {});
+      this.partitionTypes = partitionTypes;
+      this.defaultParName = defaultParName;
+      this.hivePartition = hivePartition;
+    }
+
+    public Set<String> filter(Collection<String> partitions) {
+      return partitions.stream().filter(this::evaluate).collect(Collectors.toSet());
+    }
+
+    private boolean evaluate(String partition) {
+      String[] partStrArray = FilePathUtils.extractPartitionKeyValues(
+          new org.apache.hadoop.fs.Path(partition),
+          hivePartition,
+          partitionKeys).values().toArray(new String[] {});
+      Map<String, ColumnStats> partStats = new LinkedHashMap<>();
+      for (int idx = 0; idx < partitionKeys.length; idx++) {
+        String partKey = partitionKeys[idx];
+        Object partVal = partKey.equals(defaultParName)
+            ? null : DataTypeUtils.resolvePartition(partStrArray[idx], partitionTypes.get(idx));
+        ColumnStats columnStats = new ColumnStats(partVal, partVal, partVal == null ? 1 : 0);
+        partStats.put(partKey, columnStats);
+      }
+      return partitionEvaluator.stream().allMatch(evaluator -> evaluator.eval(partStats));
+    }
+  }
+
+  /**
+   * Static partition pruner for hoodie table source which partitions list is available in compile phase.
+   * After applied this partition pruner, hoodie source could not read the data from other partitions during runtime.
+   * Note: the data of new partitions created after the job starts would never be read.
+   */
+  public static class StaticPartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Set<String> candidatePartitions;
+
+    private StaticPartitionPruner(Set<String> candidatePartitions) {
+      this.candidatePartitions = candidatePartitions;
+    }
+
+    public Set<String> filter(Collection<String> partitions) {
+      return partitions.stream()
+          .filter(this.candidatePartitions::contains).collect(Collectors.toSet());
+    }
+  }
+
+  /**
+   * Fake partition pruner which would not prune any partitions.
+   */
+  public static class FakePartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private FakePartitionPruner() {
+
+    }
+
+    @Override
+    public Set<String> filter(Collection<String> partitions) {
+      return new HashSet<>(partitions);
+    }
+  }
+
+  public static PartitionPruner getInstance(
+      List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+      List<String> partitionKeys,
+      List<DataType> partitionTypes,
+      String defaultParName,
+      boolean hivePartition) {
+    if (partitionEvaluators.isEmpty()) {
+      return new FakePartitionPruner();
+    } else {
+      return new DynamicPartitionPruner(partitionEvaluators, partitionKeys, partitionTypes, defaultParName, hivePartition);
+    }
+  }
+
+  public static PartitionPruner getInstance(Collection<String> candidatePartitions) {
+    Set<String> distinctPartitions = new HashSet<>(candidatePartitions);
+    if (distinctPartitions.isEmpty()) {

Review Comment:
   Justo to keep the old case `TestInputFormat#testReadWithChangeLogCOW` works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] beyond1920 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "beyond1920 (via GitHub)" <gi...@apache.org>.
beyond1920 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157242795


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.prune;
+
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.util.DataTypeUtils;
+
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Tools to prune partitions.
+ */
+public class PartitionPruners {
+
+  public interface PartitionPruner extends Serializable {
+
+    /**
+     * Applies partition pruning on the given partition list, return remained partitions.
+     */
+    Set<String> filter(Collection<String> partitions);
+  }
+
+  /**
+   * Dynamic partition pruner for hoodie table source which partitions list is available in runtime phase. Note: the data of new partitions created after the job starts could be read if they match the
+   * filter conditions.
+   */
+  public static class DynamicPartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<ExpressionEvaluators.Evaluator> partitionEvaluator;
+
+    private final String[] partitionKeys;
+
+    private final List<DataType> partitionTypes;
+
+    private final String defaultParName;
+
+    private final boolean hivePartition;
+
+    private DynamicPartitionPruner(
+        List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+        List<String> partitionKeys,
+        List<DataType> partitionTypes,
+        String defaultParName,
+        boolean hivePartition) {
+      this.partitionEvaluator = partitionEvaluators;
+      this.partitionKeys = partitionKeys.toArray(new String[] {});
+      this.partitionTypes = partitionTypes;
+      this.defaultParName = defaultParName;
+      this.hivePartition = hivePartition;
+    }
+
+    public Set<String> filter(Collection<String> partitions) {
+      return partitions.stream().filter(this::evaluate).collect(Collectors.toSet());
+    }
+
+    private boolean evaluate(String partition) {
+      String[] partStrArray = FilePathUtils.extractPartitionKeyValues(
+          new org.apache.hadoop.fs.Path(partition),
+          hivePartition,
+          partitionKeys).values().toArray(new String[] {});
+      Map<String, ColumnStats> partStats = new LinkedHashMap<>();
+      for (int idx = 0; idx < partitionKeys.length; idx++) {
+        String partKey = partitionKeys[idx];
+        Object partVal = partKey.equals(defaultParName)
+            ? null : DataTypeUtils.resolvePartition(partStrArray[idx], partitionTypes.get(idx));
+        ColumnStats columnStats = new ColumnStats(partVal, partVal, partVal == null ? 1 : 0);
+        partStats.put(partKey, columnStats);
+      }
+      return partitionEvaluator.stream().allMatch(evaluator -> evaluator.eval(partStats));
+    }
+  }
+
+  /**
+   * Static partition pruner for hoodie table source which partitions list is available in compile phase.
+   * After applied this partition pruner, hoodie source could not read the data from other partitions during runtime.
+   * Note: the data of new partitions created after the job starts would never be read.
+   */
+  public static class StaticPartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Set<String> candidatePartitions;
+
+    private StaticPartitionPruner(Set<String> candidatePartitions) {
+      this.candidatePartitions = candidatePartitions;
+    }
+
+    public Set<String> filter(Collection<String> partitions) {
+      return partitions.stream()
+          .filter(this.candidatePartitions::contains).collect(Collectors.toSet());
+    }
+  }
+
+  /**
+   * Fake partition pruner which would not prune any partitions.
+   */
+  public static class FakePartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private FakePartitionPruner() {
+
+    }
+
+    @Override
+    public Set<String> filter(Collection<String> partitions) {
+      return new HashSet<>(partitions);
+    }
+  }
+
+  public static PartitionPruner getInstance(
+      List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+      List<String> partitionKeys,
+      List<DataType> partitionTypes,
+      String defaultParName,
+      boolean hivePartition) {
+    if (partitionEvaluators.isEmpty()) {
+      return new FakePartitionPruner();

Review Comment:
   When there is no simple partition related conditions in the whole filter condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1464922113

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9fd31f69d5ae5a349a553cb207337833958c7099 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674) 
   * f500063efc2baefad6c6c53e77f25b730b0c0346 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1465074446

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5b46e06d81e5680024cca39a94f00b2f18c179e0 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677) 
   * e2ceb307219b4e27b276ab986c47bf77a1ec2d25 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1495434217

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     }, {
       "hash" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097",
       "triggerID" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16106",
       "triggerID" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16106) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1497301879

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     }, {
       "hash" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097",
       "triggerID" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16106",
       "triggerID" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16126",
       "triggerID" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a55fbb4f7e076f58e31ffd8b66280b9af08ceef4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a55fbb4f7e076f58e31ffd8b66280b9af08ceef4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * daabef5271b668f376ff2f7e82071d56c717cbbe Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16126) 
   * a55fbb4f7e076f58e31ffd8b66280b9af08ceef4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1483849591

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ad4f77d51bc9a280161865028f89f362fcdb754 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840) 
   * 07e8d65356ba30c13740c585d737104f30d674a5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1486248143

   > SupportsPartitionPushDown
   
   Thanks for the explanation, I think we have 2 ways to address the problem.
   
   1. Do we have some chance to fetch all the filters from the `HoodieTableSource` even though it implements the `SupportsPartitionPushDown`, like if the `SupportsFilterPushDown` applies first, if we could, then we can maintain these patition filters instead of relying on the Flink optimizer
   2. Remove the implementation of `SupportsPartitionPushDown` for `HoodieTableSource`, use the filter push down directly for partition pruning in our `FileIndex`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] beyond1920 commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "beyond1920 (via GitHub)" <gi...@apache.org>.
beyond1920 commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1486571880

   > The original partition push down is not powerful enough, it can only filter out simple partition expressions, what is the corner case of the new push down for which we need a fallback then?
   The original partition push down is executed in Flink framework, it actually could handle more complex partition expression, such as `MyUdf(part2) < 3`, `trim(part1) = 'A'.  
   
   > Does the 1st way work too?
   No. The default behavior of Flink optimizer is doing partition push down in front of filter push down.
   If wanna to modify this behavior, I have to define `FlinkBatchProgram` or `FlinkStreamProgram`. It is the property of Job tableEnv. It belongs to user's job code. So I could not wrap this behavior in Hoodie connectors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1494579359

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     }, {
       "hash" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f763056c7fe6bdfeb44e93bf4df137da2e4afd21 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080) 
   * be88d99070504f75c88bfcf48b3c078ca93a35df UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1495279686

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     }, {
       "hash" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097",
       "triggerID" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16106",
       "triggerID" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * be88d99070504f75c88bfcf48b3c078ca93a35df Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097) 
   * a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16106) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1493661443

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d2d2f59eb8ed24b0835099deb0c61d8ca993073f Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076) 
   * f763056c7fe6bdfeb44e93bf4df137da2e4afd21 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157176039


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java:
##########
@@ -177,4 +183,123 @@ public static Object getValueFromLiteral(ValueLiteralExpression expr) {
         throw new UnsupportedOperationException("Unsupported type: " + logicalType);
     }
   }
+
+  public static List<ResolvedExpression> filterSimpleCallExpression(List<ResolvedExpression> exprs) {
+    return exprs.stream()
+        .filter(ExpressionUtils::isSimpleCallExpression)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Extracts partition predicate from filter condition.
+   * Returns partition predicates and non-partition predicates.
+   */
+  public static Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> extractPartitionPredicateList(
+      List<ResolvedExpression> exprs,
+      List<String> partitionKeys,
+      RowType tableRowType) {
+    if (partitionKeys.isEmpty()) {
+      return Tuple2.of(exprs, Collections.emptyList());
+    } else {
+      List<ResolvedExpression> partitionFilters = new ArrayList<>();
+      List<ResolvedExpression> nonPartitionFilters = new ArrayList<>();
+      int[] partitionIdxMapping = tableRowType.getFieldNames().stream().mapToInt(partitionKeys::indexOf).toArray();
+      for (ResolvedExpression expr : exprs) {

Review Comment:
   Becase
   
   ```java
   int target = fieldIdxMapping[refExpr.getFieldIndex()];
                     if (target >= 0) {
   ```
   
   is kind of confusing.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java:
##########
@@ -177,4 +183,123 @@ public static Object getValueFromLiteral(ValueLiteralExpression expr) {
         throw new UnsupportedOperationException("Unsupported type: " + logicalType);
     }
   }
+
+  public static List<ResolvedExpression> filterSimpleCallExpression(List<ResolvedExpression> exprs) {
+    return exprs.stream()
+        .filter(ExpressionUtils::isSimpleCallExpression)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Extracts partition predicate from filter condition.
+   * Returns partition predicates and non-partition predicates.
+   */
+  public static Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> extractPartitionPredicateList(
+      List<ResolvedExpression> exprs,
+      List<String> partitionKeys,
+      RowType tableRowType) {
+    if (partitionKeys.isEmpty()) {
+      return Tuple2.of(exprs, Collections.emptyList());
+    } else {
+      List<ResolvedExpression> partitionFilters = new ArrayList<>();
+      List<ResolvedExpression> nonPartitionFilters = new ArrayList<>();
+      int[] partitionIdxMapping = tableRowType.getFieldNames().stream().mapToInt(partitionKeys::indexOf).toArray();
+      for (ResolvedExpression expr : exprs) {

Review Comment:
   Because
   
   ```java
   int target = fieldIdxMapping[refExpr.getFieldIndex()];
                     if (target >= 0) {
   ```
   
   is kind of confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1497382439

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     }, {
       "hash" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097",
       "triggerID" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16106",
       "triggerID" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16126",
       "triggerID" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a55fbb4f7e076f58e31ffd8b66280b9af08ceef4",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16138",
       "triggerID" : "a55fbb4f7e076f58e31ffd8b66280b9af08ceef4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d549b98077efb89ce3e5edeebdb54088f26d345c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16141",
       "triggerID" : "d549b98077efb89ce3e5edeebdb54088f26d345c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a55fbb4f7e076f58e31ffd8b66280b9af08ceef4 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16138) 
   * d549b98077efb89ce3e5edeebdb54088f26d345c Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16141) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1144213813


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java:
##########
@@ -177,4 +183,119 @@ public static Object getValueFromLiteral(ValueLiteralExpression expr) {
         throw new UnsupportedOperationException("Unsupported type: " + logicalType);
     }
   }
+
+  public static List<ResolvedExpression> filterSimpleCallExpression(List<ResolvedExpression> exprs) {
+    return exprs.stream()
+        .filter(ExpressionUtils::isSimpleCallExpression)
+        .collect(Collectors.toList());
+  }
+
+  public static Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> extractPartitionPredicateList(

Review Comment:
   Give some documents to this method



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieContinuousPartitionSource.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.utils.FlinkMiniCluster;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.factory.CollectSinkTableFactory;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.utils.TestConfigurations.sql;
+import static org.apache.hudi.utils.TestData.assertRowsEquals;
+import static org.apache.hudi.utils.TestData.insertRow;
+
+/**
+ * IT cases for Hoodie table source and sink.
+ */
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestHoodieContinuousPartitionSource {
+
+  private static List<RowData> DATA_SET_NEW_PARTITIONS = Arrays.asList(
+      insertRow(StringData.fromString("id9"), StringData.fromString("LiLi"), 24,
+          TimestampData.fromEpochMillis(9), StringData.fromString("par5")),
+      insertRow(StringData.fromString("id10"), StringData.fromString("Guoguo"), 34,

Review Comment:
   Can we write a SQL test in `ITTestHoodieDataSource` instead of adding this class?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/StaticPartitionPruner.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.prune;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Static partition pruner for hoodie table source which partitions list is available in compile phase.
+ * After applied this partition pruner, hoodie source could not read the data from other partitions during runtime.
+ * Note: the data of new partitions created after the job starts would never be read.
+ */
+public class StaticPartitionPruner implements PartitionPruner {
+
+  private static final long serialVersionUID = 1L;

Review Comment:
   Can we merge the PartitionPruner into one, it can cache a list of candidate partitions which have higher priority always.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieContinuousPartitionTableSource.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.FileIndex;
+import org.apache.hudi.source.prune.DynamicPartitionPruner;
+import org.apache.hudi.source.prune.PartitionPruner;
+import org.apache.hudi.table.format.InternalSchemaManager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.util.ExpressionUtils.extractPartitionPredicateList;
+import static org.apache.hudi.util.ExpressionUtils.filterSimpleCallExpression;
+
+/**
+ * Hoodie stream table source which performs partition prune based on the filter conditions in the
+ * runtime. Unlike {@link HoodieTableSource}, the satisfied partitions are determine in the
+ * optimization phase and never change in the runtime.
+ *
+ */
+public class HoodieContinuousPartitionTableSource
+    extends AbstractHoodieTableSource {
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieTableSource.class);
+
+  private List<ResolvedExpression> partitionFilters;

Review Comment:
   There is no need to bring in another kind of `HoodieTableSource`, try to add changes to the existing `HoodieTableSource`



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/DataPruner.java:
##########
@@ -7,17 +7,19 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.

Review Comment:
   why this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1465073621

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f500063efc2baefad6c6c53e77f25b730b0c0346 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675) 
   * 5b46e06d81e5680024cca39a94f00b2f18c179e0 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677) 
   * e2ceb307219b4e27b276ab986c47bf77a1ec2d25 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1493656956

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 07e8d65356ba30c13740c585d737104f30d674a5 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916) 
   * d2d2f59eb8ed24b0835099deb0c61d8ca993073f Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076) 
   * f763056c7fe6bdfeb44e93bf4df137da2e4afd21 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1155536594


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruner.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.prune;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.util.DataTypeUtils;
+
+import org.apache.flink.table.types.DataType;
+import org.apache.hadoop.fs.Path;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Dynamic partition pruner for hoodie table source which partitions list is available in runtime phase. Note: the data of new partitions created after the job starts could be read if they match the
+ * filter conditions.
+ */
+public class PartitionPruner implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private final Option<List<ExpressionEvaluators.Evaluator>> partitionEvaluatorOpt;
+
+  private final List<String> partitionKeys;
+
+  private final List<DataType> partitionTypes;
+
+  private final String defaultParName;
+
+  private final boolean hivePartition;
+
+  private final Option<Set<String>> candidatePartitionOpt;
+
+  public PartitionPruner(
+      List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+      List<String> partitionKeys,
+      List<DataType> partitionTypes,
+      String defaultParName,
+      boolean hivePartition) {
+    this(null, partitionEvaluators, partitionKeys, partitionTypes, defaultParName, hivePartition);
+  }
+
+  public PartitionPruner(Set<String> candidatePartitions) {
+    this(candidatePartitions, null, null, null, null, false);
+  }
+
+  private PartitionPruner(
+      Set<String> candidatePartitions,
+      List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+      List<String> partitionKeys,
+      List<DataType> partitionTypes,
+      String defaultParName,
+      boolean hivePartition) {
+    this.candidatePartitionOpt = Option.ofNullable(candidatePartitions);
+    this.partitionEvaluatorOpt = Option.ofNullable(partitionEvaluators);
+    this.partitionKeys = partitionKeys;
+    this.partitionTypes = partitionTypes;
+    this.defaultParName = defaultParName;
+    this.hivePartition = hivePartition;
+  }
+
+  public Set<String> filter(Collection<String> partitions) {
+    return partitions.stream().filter(this::evaluate).collect(Collectors.toSet());
+  }
+
+  private boolean evaluate(String partition) {
+    if (candidatePartitionOpt.isPresent() && !candidatePartitionOpt.get().contains(partition)) {
+      return false;
+    }
+    if (!partitionEvaluatorOpt.isPresent() || partitionEvaluatorOpt.get().isEmpty()) {
+      return true;
+    }
+    String[] partStrArray = extractPartitionValues(partition, partitionKeys, hivePartition);
+    Map<String, ColumnStats> partStats = new LinkedHashMap<>();
+    for (int idx = 0; idx < partitionKeys.size(); idx++) {
+      String partKey = partitionKeys.get(idx);
+      Object partVal = partKey.equals(defaultParName)
+          ? null : DataTypeUtils.resolvePartition(partStrArray[idx], partitionTypes.get(idx));
+      ColumnStats columnStats = new ColumnStats(partVal, partVal, partVal == null ? 1 : 0);
+      partStats.put(partKey, columnStats);
+    }
+    return partitionEvaluatorOpt.get().stream().allMatch(evaluator -> evaluator.eval(partStats));
+  }
+
+  private static String[] extractPartitionValues(
+      String partitionPath,
+      List<String> partitionKeys,

Review Comment:
   Can reuse the method: `FilePathUtils.extractPartitionKeyValues`



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java:
##########
@@ -177,4 +183,123 @@ public static Object getValueFromLiteral(ValueLiteralExpression expr) {
         throw new UnsupportedOperationException("Unsupported type: " + logicalType);
     }
   }
+
+  public static List<ResolvedExpression> filterSimpleCallExpression(List<ResolvedExpression> exprs) {
+    return exprs.stream()
+        .filter(ExpressionUtils::isSimpleCallExpression)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Extracts partition predicate from filter condition.
+   * Returns partition predicates and non-partition predicates.
+   */
+  public static Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> extractPartitionPredicateList(
+      List<ResolvedExpression> exprs,
+      List<String> partitionKeys,
+      RowType tableRowType) {
+    if (partitionKeys.isEmpty()) {
+      return Tuple2.of(exprs, Collections.emptyList());
+    } else {
+      List<ResolvedExpression> partitionFilters = new ArrayList<>();
+      List<ResolvedExpression> nonPartitionFilters = new ArrayList<>();
+      int[] partitionIdxMapping = tableRowType.getFieldNames().stream().mapToInt(partitionKeys::indexOf).toArray();
+      for (ResolvedExpression expr : exprs) {

Review Comment:
   instead of the using int[], is it possible we instantiate it as a set of Integer, and filter the candidate expressions by that?



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -86,6 +90,17 @@
  */
 @ExtendWith(FlinkMiniCluster.class)
 public class ITTestHoodieDataSource {
+
+  private static List<RowData> DATA_SET_NEW_PARTITIONS = Arrays.asList(
+      insertRow(
+          StringData.fromString("id9"), StringData.fromString("LiLi"), 24,
+          TimestampData.fromEpochMillis(9), StringData.fromString("par5")),
+      insertRow(StringData.fromString("id11"), StringData.fromString("Baobao"), 34,
+          TimestampData.fromEpochMillis(10), StringData.fromString("par7")),
+      insertRow(StringData.fromString("id12"), StringData.fromString("LinLin"), 34,
+          TimestampData.fromEpochMillis(10), StringData.fromString("part8"))

Review Comment:
   Can we move all the test data into clazz `TestData`.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java:
##########
@@ -60,23 +61,36 @@ public class FileIndex {
 
   private final Path path;
   private final RowType rowType;
+  private final boolean tableExists;
   private final HoodieMetadataConfig metadataConfig;
+  private final PartitionPruner partitionPruner;
   private final boolean dataSkippingEnabled;
+  private final DataPruner dataPruner;
   private List<String> partitionPaths;      // cache of partition paths
-  private List<ResolvedExpression> filters; // push down filters
-  private final boolean tableExists;
-  private DataPruner dataPruner;
 
-  private FileIndex(Path path, Configuration conf, RowType rowType) {
+  private FileIndex(Path path, Configuration conf, RowType rowType, DataPruner dataPruner, PartitionPruner partitionPruner) {
     this.path = path;
     this.rowType = rowType;
+    this.tableExists = StreamerUtil.tableExists(path.toString(), HadoopConfigurations.getHadoopConf(conf));
     this.metadataConfig = metadataConfig(conf);
     this.dataSkippingEnabled = conf.getBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED);
-    this.tableExists = StreamerUtil.tableExists(path.toString(), HadoopConfigurations.getHadoopConf(conf));
+    // NOTE: Data Skipping is only effective when it references columns that are indexed w/in
+    //       the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping:
+    //          - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since
+    //          CSI only contains stats for top-level columns, in this case for "struct")
+    //          - Any expression not directly referencing top-level column (for ex, sub-queries, since there's
+    //          nothing CSI in particular could be applied for)
+    if (!metadataConfig.enabled() || !dataSkippingEnabled) {
+      validateConfig();
+      this.dataPruner = null;
+    } else {
+      this.dataPruner = dataPruner;
+    }
+    this.partitionPruner = partitionPruner;

Review Comment:
   can we move the instantiation of `dataPruner` into a separate method named like `initDataPruner` ?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruner.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.prune;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.util.DataTypeUtils;
+
+import org.apache.flink.table.types.DataType;
+import org.apache.hadoop.fs.Path;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Dynamic partition pruner for hoodie table source which partitions list is available in runtime phase. Note: the data of new partitions created after the job starts could be read if they match the
+ * filter conditions.
+ */
+public class PartitionPruner implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private final Option<List<ExpressionEvaluators.Evaluator>> partitionEvaluatorOpt;
+
+  private final List<String> partitionKeys;
+
+  private final List<DataType> partitionTypes;
+
+  private final String defaultParName;
+
+  private final boolean hivePartition;
+
+  private final Option<Set<String>> candidatePartitionOpt;
+
+  public PartitionPruner(
+      List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+      List<String> partitionKeys,

Review Comment:
   Can we have some subclass for the `PartitionPruner` in 3 cases:
   
   1. constant canddate filters to filter by
   2. Explicit partition evaludators
   3. No explicit canddate filters or partition evaludators and should always return true directly.
   
   Define the 3 sub-classes as static inner class and add add a factory method `PartitionPruner #getInstance` to fetch the right pruner instant with given configurations.



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -1892,4 +1992,66 @@ private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sin
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
+
+  private void testContinuousPartitionPrune(
+      HoodieTableType tableType,
+      boolean hiveStylePartitioning,
+      String filterCondition,
+      List<RowData> results
+  ) throws Exception {
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.TABLE_NAME, "t1");
+    conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
+    conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
+
+    // write one commit
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.READ_AS_STREAMING, true)
+        .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
+        .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+
+    String sinkDDL = "create table sink(\n"
+        + "  uuid varchar(20),\n"
+        + "  name varchar(20),\n"
+        + "  age int,\n"
+        + "  ts timestamp,\n"
+        + "  part varchar(20)"
+        + ") with (\n"
+        + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+        + ")";
+    TableResult tableResult = submitSelectSql(
+        streamTableEnv,
+        "select uuid, name, age, ts, `partition` as part from t1 where " + filterCondition,
+        sinkDDL);
+
+    // write second commit
+    TestData.writeData(DATA_SET_NEW_PARTITIONS, conf);
+    // stop the streaming query and fetch the result
+    List<Row> result = stopAndFetchData(streamTableEnv, tableResult, 10);
+    assertRowsEquals(result, results);
+  }
+
+  private TableResult submitSelectSql(TableEnvironment tEnv, String select, String sinkDDL) {
+    tEnv.executeSql("DROP TABLE IF EXISTS sink");
+    tEnv.executeSql(sinkDDL);
+    TableResult tableResult = tEnv.executeSql("insert into sink " + select);
+    return tableResult;
+  }
+
+  private List<Row> stopAndFetchData(TableEnvironment tEnv, TableResult tableResult, long timeout)
+      throws InterruptedException {
+    // wait for the timeout then cancels the job

Review Comment:
   The existing `#execSelectSql` does not work here?



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java:
##########
@@ -98,7 +98,7 @@ void testFileListingUsingMetadataNonPartitionedTable() throws Exception {
   void testFileListingEmptyTable(boolean enableMetadata) {
     Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setBoolean(METADATA_ENABLED, enableMetadata);
-    FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf, TestConfigurations.ROW_TYPE);
+    FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf, TestConfigurations.ROW_TYPE, null, null);
     List<String> partitionKeys = Collections.singletonList("partition");

Review Comment:
   Can add another factory method with no pruners.



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -1892,4 +1992,66 @@ private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sin
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
+
+  private void testContinuousPartitionPrune(
+      HoodieTableType tableType,
+      boolean hiveStylePartitioning,
+      String filterCondition,
+      List<RowData> results
+  ) throws Exception {
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());

Review Comment:
   Can we simplify the test case to reuse one input data set, and test all kinds of filter oconditions.
   
   We can parameterize the param: tableType, hiveStylePartitioning and filterConditions then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1493602244

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 07e8d65356ba30c13740c585d737104f30d674a5 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916) 
   * d2d2f59eb8ed24b0835099deb0c61d8ca993073f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157165945


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java:
##########
@@ -60,23 +61,29 @@ public class FileIndex {
 
   private final Path path;
   private final RowType rowType;
+  private final boolean tableExists;
   private final HoodieMetadataConfig metadataConfig;
+  private final PartitionPruners.PartitionPruner partitionPruner;
   private final boolean dataSkippingEnabled;
+  private final DataPruner dataPruner;
   private List<String> partitionPaths;      // cache of partition paths
-  private List<ResolvedExpression> filters; // push down filters
-  private final boolean tableExists;
-  private DataPruner dataPruner;
 
-  private FileIndex(Path path, Configuration conf, RowType rowType) {
+  private FileIndex(Path path, Configuration conf, RowType rowType, DataPruner dataPruner, PartitionPruners.PartitionPruner partitionPruner) {
     this.path = path;
     this.rowType = rowType;
+    this.tableExists = StreamerUtil.tableExists(path.toString(), HadoopConfigurations.getHadoopConf(conf));
     this.metadataConfig = metadataConfig(conf);
     this.dataSkippingEnabled = conf.getBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED);
-    this.tableExists = StreamerUtil.tableExists(path.toString(), HadoopConfigurations.getHadoopConf(conf));
+    this.dataPruner = initializeDataPruner(dataPruner);

Review Comment:
   this.dataPruner = isDataSkippingFeasible() ? dataPruner : null;



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1497139042

   [5880.patch.zip](https://github.com/apache/hudi/files/11156911/5880.patch.zip)
   Thanks for the contribution, I have reviewed and applied a patch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 merged pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 merged PR #8102:
URL: https://github.com/apache/hudi/pull/8102


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] BruceKellan commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "BruceKellan (via GitHub)" <gi...@apache.org>.
BruceKellan commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1129228090


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java:
##########
@@ -177,4 +183,113 @@ public static Object getValueFromLiteral(ValueLiteralExpression expr) {
         throw new UnsupportedOperationException("Unsupported type: " + logicalType);
     }
   }
+
+  public static List<ResolvedExpression> filterSimpleCallExpression(List<ResolvedExpression> exprs) {
+    return exprs.stream()
+        .filter(ExpressionUtils::isSimpleCallExpression)
+        .collect(Collectors.toList());
+  }
+
+  public static Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> extractPartitionPredicateList(
+      List<ResolvedExpression> exprs,
+      List<String> partitionKeys,
+      RowType tableRowType) {
+    if (partitionKeys.isEmpty()) {
+      return Tuple2.of(exprs, Collections.emptyList());
+    } else {
+      List<ResolvedExpression> partitionFilters = new ArrayList<>();
+      List<ResolvedExpression> nonPartitionFilters = new ArrayList<>();
+      int[] partitionIdxMapping = tableRowType.getFieldNames().stream().mapToInt(partitionKeys::indexOf).toArray();
+      for (ResolvedExpression expr : exprs) {
+        for (CallExpression e : splitByAnd(expr)) {
+          CallExpression convertedExpr = applyMapping(e, partitionIdxMapping);
+          if (convertedExpr != null) {
+            partitionFilters.add(convertedExpr);
+          } else {
+            nonPartitionFilters.add(e);
+          }
+        }
+      }
+      return Tuple2.of(nonPartitionFilters, partitionFilters);
+    }
+  }
+
+  private static List<CallExpression> splitByAnd(ResolvedExpression expr) {
+    List<CallExpression> result = new ArrayList<>();
+    splitByAnd(expr, result);
+    return result;
+  }
+
+  private static void splitByAnd(
+      ResolvedExpression expr,
+      List<CallExpression> result) {
+    if (!(expr instanceof CallExpression)) {
+      return;
+    }
+    CallExpression callExpr = (CallExpression) expr;
+    FunctionDefinition funcDef = callExpr.getFunctionDefinition();
+
+    if (funcDef == BuiltInFunctionDefinitions.AND) {
+      callExpr.getChildren().stream()
+          .filter(child -> child instanceof CallExpression)
+          .forEach(child -> splitByAnd((CallExpression) child, result));
+    } else {
+      result.add(callExpr);
+    }
+  }
+
+  private static CallExpression applyMapping(CallExpression expr, int[] fieldIdxMapping) {
+    FunctionDefinition funcDef = expr.getFunctionDefinition();
+    if (funcDef == BuiltInFunctionDefinitions.IN
+        || funcDef == BuiltInFunctionDefinitions.EQUALS
+        || funcDef == BuiltInFunctionDefinitions.NOT_EQUALS
+        || funcDef == BuiltInFunctionDefinitions.IS_NULL
+        || funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL
+        || funcDef == BuiltInFunctionDefinitions.LESS_THAN
+        || funcDef == BuiltInFunctionDefinitions.GREATER_THAN
+        || funcDef == BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL
+        || funcDef == BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL) {
+      List<Expression> children = expr.getChildren();
+      List<ResolvedExpression> newChildren = children.stream()
+          .map(
+              child -> {
+                if (child instanceof FieldReferenceExpression) {
+                  FieldReferenceExpression refExpr = (FieldReferenceExpression) child;
+                  int target = fieldIdxMapping[refExpr.getFieldIndex()];
+                  if (target >= 0) {
+                    return new FieldReferenceExpression(
+                        refExpr.getName(),
+                        refExpr.getOutputDataType(),
+                        refExpr.getInputIndex(),
+                        target);
+                  } else {
+                    return null;
+                  }
+                } else {
+                  return (ResolvedExpression) child;
+                }
+              })
+          .filter(Objects::nonNull)
+          .collect(Collectors.toList());
+      if (newChildren.size() == children.size()) {
+        return expr.replaceArgs(newChildren, expr.getOutputDataType());

Review Comment:
   `replaceArgs` does not exist in flink1.13



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1478966063

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ad4f77d51bc9a280161865028f89f362fcdb754 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1478890643

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e2ceb307219b4e27b276ab986c47bf77a1ec2d25 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678) 
   * 3ad4f77d51bc9a280161865028f89f362fcdb754 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1495101369

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     }, {
       "hash" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097",
       "triggerID" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * be88d99070504f75c88bfcf48b3c078ca93a35df Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157171143


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.prune;
+
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.util.DataTypeUtils;
+
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Tools to prune partitions.
+ */
+public class PartitionPruners {
+
+  public interface PartitionPruner extends Serializable {
+
+    /**
+     * Applies partition pruning on the given partition list, return remained partitions.
+     */
+    Set<String> filter(Collection<String> partitions);
+  }
+
+  /**
+   * Dynamic partition pruner for hoodie table source which partitions list is available in runtime phase. Note: the data of new partitions created after the job starts could be read if they match the
+   * filter conditions.
+   */
+  public static class DynamicPartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<ExpressionEvaluators.Evaluator> partitionEvaluator;
+
+    private final String[] partitionKeys;
+
+    private final List<DataType> partitionTypes;
+
+    private final String defaultParName;
+
+    private final boolean hivePartition;
+
+    private DynamicPartitionPruner(
+        List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+        List<String> partitionKeys,
+        List<DataType> partitionTypes,
+        String defaultParName,
+        boolean hivePartition) {
+      this.partitionEvaluator = partitionEvaluators;
+      this.partitionKeys = partitionKeys.toArray(new String[] {});
+      this.partitionTypes = partitionTypes;
+      this.defaultParName = defaultParName;
+      this.hivePartition = hivePartition;
+    }
+
+    public Set<String> filter(Collection<String> partitions) {
+      return partitions.stream().filter(this::evaluate).collect(Collectors.toSet());
+    }
+
+    private boolean evaluate(String partition) {
+      String[] partStrArray = FilePathUtils.extractPartitionKeyValues(
+          new org.apache.hadoop.fs.Path(partition),
+          hivePartition,
+          partitionKeys).values().toArray(new String[] {});
+      Map<String, ColumnStats> partStats = new LinkedHashMap<>();
+      for (int idx = 0; idx < partitionKeys.length; idx++) {
+        String partKey = partitionKeys[idx];
+        Object partVal = partKey.equals(defaultParName)
+            ? null : DataTypeUtils.resolvePartition(partStrArray[idx], partitionTypes.get(idx));
+        ColumnStats columnStats = new ColumnStats(partVal, partVal, partVal == null ? 1 : 0);
+        partStats.put(partKey, columnStats);
+      }
+      return partitionEvaluator.stream().allMatch(evaluator -> evaluator.eval(partStats));
+    }
+  }
+
+  /**
+   * Static partition pruner for hoodie table source which partitions list is available in compile phase.
+   * After applied this partition pruner, hoodie source could not read the data from other partitions during runtime.
+   * Note: the data of new partitions created after the job starts would never be read.
+   */
+  public static class StaticPartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Set<String> candidatePartitions;
+
+    private StaticPartitionPruner(Set<String> candidatePartitions) {
+      this.candidatePartitions = candidatePartitions;
+    }
+
+    public Set<String> filter(Collection<String> partitions) {
+      return partitions.stream()
+          .filter(this.candidatePartitions::contains).collect(Collectors.toSet());
+    }
+  }
+
+  /**
+   * Fake partition pruner which would not prune any partitions.
+   */
+  public static class FakePartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private FakePartitionPruner() {
+
+    }
+
+    @Override
+    public Set<String> filter(Collection<String> partitions) {
+      return new HashSet<>(partitions);
+    }
+  }
+
+  public static PartitionPruner getInstance(
+      List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+      List<String> partitionKeys,
+      List<DataType> partitionTypes,
+      String defaultParName,
+      boolean hivePartition) {
+    if (partitionEvaluators.isEmpty()) {
+      return new FakePartitionPruner();
+    } else {
+      return new DynamicPartitionPruner(partitionEvaluators, partitionKeys, partitionTypes, defaultParName, hivePartition);
+    }
+  }
+
+  public static PartitionPruner getInstance(Collection<String> candidatePartitions) {
+    Set<String> distinctPartitions = new HashSet<>(candidatePartitions);
+    if (distinctPartitions.isEmpty()) {

Review Comment:
   In which case the `candidatePartitions` can be empty? Is the `FakePartitionPruner` really needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] beyond1920 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "beyond1920 (via GitHub)" <gi...@apache.org>.
beyond1920 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157215306


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -1892,4 +1992,66 @@ private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sin
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
+
+  private void testContinuousPartitionPrune(
+      HoodieTableType tableType,
+      boolean hiveStylePartitioning,
+      String filterCondition,
+      List<RowData> results
+  ) throws Exception {
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.TABLE_NAME, "t1");
+    conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
+    conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
+
+    // write one commit
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.READ_AS_STREAMING, true)
+        .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
+        .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+
+    String sinkDDL = "create table sink(\n"
+        + "  uuid varchar(20),\n"
+        + "  name varchar(20),\n"
+        + "  age int,\n"
+        + "  ts timestamp,\n"
+        + "  part varchar(20)"
+        + ") with (\n"
+        + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+        + ")";
+    TableResult tableResult = submitSelectSql(
+        streamTableEnv,
+        "select uuid, name, age, ts, `partition` as part from t1 where " + filterCondition,
+        sinkDDL);
+
+    // write second commit
+    TestData.writeData(DATA_SET_NEW_PARTITIONS, conf);
+    // stop the streaming query and fetch the result
+    List<Row> result = stopAndFetchData(streamTableEnv, tableResult, 10);
+    assertRowsEquals(result, results);
+  }
+
+  private TableResult submitSelectSql(TableEnvironment tEnv, String select, String sinkDDL) {
+    tEnv.executeSql("DROP TABLE IF EXISTS sink");
+    tEnv.executeSql(sinkDDL);
+    TableResult tableResult = tEnv.executeSql("insert into sink " + select);
+    return tableResult;
+  }
+
+  private List<Row> stopAndFetchData(TableEnvironment tEnv, TableResult tableResult, long timeout)
+      throws InterruptedException {
+    // wait for the timeout then cancels the job

Review Comment:
   I add those tests in order to verify that new partitions which is added after the job is running could also be read.
   So I need a hold on logic here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] beyond1920 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "beyond1920 (via GitHub)" <gi...@apache.org>.
beyond1920 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157216410


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -1835,6 +1885,75 @@ private static Stream<Arguments> indexAndTableTypeParams() {
     return Stream.of(data).map(Arguments::of);
   }
 
+  /**
+   * Return test params => (HoodieTableType, hive style partitioning, filter condition, expected result).
+   */
+  private static Stream<Arguments> tableTypeAndFilters() {
+    HoodieTableType[] tableTypes = {HoodieTableType.COPY_ON_WRITE, HoodieTableType.MERGE_ON_READ};
+    boolean [] isHiveStylePartitions = {true, false};
+    Pair<String,  List<RowData>>[] filterAndResults = new Pair[] {
+        Pair.of(
+            "`partition` = 'par5' or `partition` = 'par6'",
+            Arrays.asList(
+                insertRow(StringData.fromString("id9"), StringData.fromString("LiLi"), 24,
+                    TimestampData.fromEpochMillis(9), StringData.fromString("par5")))),
+        Pair.of(
+        "`partition` >= 'par5' and `partition` <= 'par6'",
+        Arrays.asList(
+            insertRow(StringData.fromString("id9"), StringData.fromString("LiLi"), 24,
+                TimestampData.fromEpochMillis(9), StringData.fromString("par5")))),
+        Pair.of(
+        "`partition` <> 'par7'",
+        Arrays.asList(
+            insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+                TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+            insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 33,
+                TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+            insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 53,

Review Comment:
   Already use one input data set.
   Those data is output data, which is different based on different filter conditions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1494594226

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     }, {
       "hash" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097",
       "triggerID" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f763056c7fe6bdfeb44e93bf4df137da2e4afd21 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080) 
   * be88d99070504f75c88bfcf48b3c078ca93a35df Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1465065055

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f500063efc2baefad6c6c53e77f25b730b0c0346 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675) 
   * 5b46e06d81e5680024cca39a94f00b2f18c179e0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1486455250

   > @danny0405 Thanks for suggestion. The pr actually already use the second way as you suggest. Please see the `HoodieContinuousPartitionTableSource `. I just keep the `HoodieTableSource ` at the same time to provide a way fallback to the original behavior if the query doesn't work well using new `HoodieContinuousPartitionTableSource `.
   
    The original partition push down is not powerful enough, it can only filter out simple partition expressions, what is the corner case of the new push down for which we need a fallback then?
    
    Does the 1st way work too?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1144209473


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -318,6 +318,13 @@ private FlinkOptions() {
               + "1) you are definitely sure that the consumer reads [faster than/completes before] any clustering instants "
               + "when " + HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key() + " is set to false.\n");
 
+  // this option is experimental
+  public static final ConfigOption<Boolean> READ_STREAMING_CONTINUOUS_PARTITION_PRUNE = ConfigOptions
+      .key("read.streaming.continuous.partition-prune")
+      .booleanType()
+      .defaultValue(false)// default disable continuous partition pruning
+      .withDescription("Whether to enable continuous partition pruning for streaming read, default false");

Review Comment:
   Can we emilinate this option?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1144209473


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -318,6 +318,13 @@ private FlinkOptions() {
               + "1) you are definitely sure that the consumer reads [faster than/completes before] any clustering instants "
               + "when " + HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key() + " is set to false.\n");
 
+  // this option is experimental
+  public static final ConfigOption<Boolean> READ_STREAMING_CONTINUOUS_PARTITION_PRUNE = ConfigOptions
+      .key("read.streaming.continuous.partition-prune")
+      .booleanType()
+      .defaultValue(false)// default disable continuous partition pruning
+      .withDescription("Whether to enable continuous partition pruning for streaming read, default false");

Review Comment:
   Can we emilinate this option? We should try best for this optimization, if any exception use case is detected, just fallback to the original behavior/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1464956060

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f500063efc2baefad6c6c53e77f25b730b0c0346 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1464899815

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9fd31f69d5ae5a349a553cb207337833958c7099 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1464923325

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9fd31f69d5ae5a349a553cb207337833958c7099 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674) 
   * f500063efc2baefad6c6c53e77f25b730b0c0346 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1483851191

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ad4f77d51bc9a280161865028f89f362fcdb754 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840) 
   * 07e8d65356ba30c13740c585d737104f30d674a5 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1497310962

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     }, {
       "hash" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097",
       "triggerID" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16106",
       "triggerID" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16126",
       "triggerID" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a55fbb4f7e076f58e31ffd8b66280b9af08ceef4",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16138",
       "triggerID" : "a55fbb4f7e076f58e31ffd8b66280b9af08ceef4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * daabef5271b668f376ff2f7e82071d56c717cbbe Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16126) 
   * a55fbb4f7e076f58e31ffd8b66280b9af08ceef4 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16138) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1497369912

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     }, {
       "hash" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097",
       "triggerID" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16106",
       "triggerID" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16126",
       "triggerID" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a55fbb4f7e076f58e31ffd8b66280b9af08ceef4",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16138",
       "triggerID" : "a55fbb4f7e076f58e31ffd8b66280b9af08ceef4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d549b98077efb89ce3e5edeebdb54088f26d345c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d549b98077efb89ce3e5edeebdb54088f26d345c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * daabef5271b668f376ff2f7e82071d56c717cbbe Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16126) 
   * a55fbb4f7e076f58e31ffd8b66280b9af08ceef4 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16138) 
   * d549b98077efb89ce3e5edeebdb54088f26d345c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157164631


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java:
##########
@@ -309,6 +287,13 @@ private DataPruner initializeDataPruner(List<ResolvedExpression> filters) {
       validateConfig();
       return null;
     }
-    return DataPruner.newInstance(filters);
+    return dataPruner;
+  }
+
+  private void validateConfig() {

Review Comment:
   `validateConfig` -> `isDataSkippingFeasible`, and returns boolean instead of void.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] beyond1920 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "beyond1920 (via GitHub)" <gi...@apache.org>.
beyond1920 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157229473


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java:
##########
@@ -177,4 +183,123 @@ public static Object getValueFromLiteral(ValueLiteralExpression expr) {
         throw new UnsupportedOperationException("Unsupported type: " + logicalType);
     }
   }
+
+  public static List<ResolvedExpression> filterSimpleCallExpression(List<ResolvedExpression> exprs) {
+    return exprs.stream()
+        .filter(ExpressionUtils::isSimpleCallExpression)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Extracts partition predicate from filter condition.
+   * Returns partition predicates and non-partition predicates.
+   */
+  public static Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> extractPartitionPredicateList(
+      List<ResolvedExpression> exprs,
+      List<String> partitionKeys,
+      RowType tableRowType) {
+    if (partitionKeys.isEmpty()) {
+      return Tuple2.of(exprs, Collections.emptyList());
+    } else {
+      List<ResolvedExpression> partitionFilters = new ArrayList<>();
+      List<ResolvedExpression> nonPartitionFilters = new ArrayList<>();
+      int[] partitionIdxMapping = tableRowType.getFieldNames().stream().mapToInt(partitionKeys::indexOf).toArray();
+      for (ResolvedExpression expr : exprs) {

Review Comment:
   I need a mapping from partition key index in original table source to partition key in partition keys list.
   Do you mean changes to a Map?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1496041269

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     }, {
       "hash" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097",
       "triggerID" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16106",
       "triggerID" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16126",
       "triggerID" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16106) 
   * daabef5271b668f376ff2f7e82071d56c717cbbe Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16126) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157169377


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.prune;
+
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.util.DataTypeUtils;
+
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Tools to prune partitions.
+ */
+public class PartitionPruners {
+
+  public interface PartitionPruner extends Serializable {
+
+    /**
+     * Applies partition pruning on the given partition list, return remained partitions.
+     */
+    Set<String> filter(Collection<String> partitions);
+  }
+
+  /**
+   * Dynamic partition pruner for hoodie table source which partitions list is available in runtime phase. Note: the data of new partitions created after the job starts could be read if they match the
+   * filter conditions.
+   */
+  public static class DynamicPartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<ExpressionEvaluators.Evaluator> partitionEvaluator;
+
+    private final String[] partitionKeys;
+
+    private final List<DataType> partitionTypes;
+
+    private final String defaultParName;
+
+    private final boolean hivePartition;
+
+    private DynamicPartitionPruner(
+        List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+        List<String> partitionKeys,
+        List<DataType> partitionTypes,
+        String defaultParName,
+        boolean hivePartition) {
+      this.partitionEvaluator = partitionEvaluators;
+      this.partitionKeys = partitionKeys.toArray(new String[] {});
+      this.partitionTypes = partitionTypes;
+      this.defaultParName = defaultParName;
+      this.hivePartition = hivePartition;
+    }
+
+    public Set<String> filter(Collection<String> partitions) {
+      return partitions.stream().filter(this::evaluate).collect(Collectors.toSet());
+    }
+
+    private boolean evaluate(String partition) {
+      String[] partStrArray = FilePathUtils.extractPartitionKeyValues(
+          new org.apache.hadoop.fs.Path(partition),
+          hivePartition,
+          partitionKeys).values().toArray(new String[] {});
+      Map<String, ColumnStats> partStats = new LinkedHashMap<>();
+      for (int idx = 0; idx < partitionKeys.length; idx++) {
+        String partKey = partitionKeys[idx];
+        Object partVal = partKey.equals(defaultParName)
+            ? null : DataTypeUtils.resolvePartition(partStrArray[idx], partitionTypes.get(idx));
+        ColumnStats columnStats = new ColumnStats(partVal, partVal, partVal == null ? 1 : 0);
+        partStats.put(partKey, columnStats);
+      }
+      return partitionEvaluator.stream().allMatch(evaluator -> evaluator.eval(partStats));
+    }
+  }
+
+  /**
+   * Static partition pruner for hoodie table source which partitions list is available in compile phase.
+   * After applied this partition pruner, hoodie source could not read the data from other partitions during runtime.
+   * Note: the data of new partitions created after the job starts would never be read.
+   */
+  public static class StaticPartitionPruner implements PartitionPruner {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Set<String> candidatePartitions;
+
+    private StaticPartitionPruner(Set<String> candidatePartitions) {
+      this.candidatePartitions = candidatePartitions;
+    }
+
+    public Set<String> filter(Collection<String> partitions) {
+      return partitions.stream()
+          .filter(this.candidatePartitions::contains).collect(Collectors.toSet());
+    }
+  }
+
+  /**
+   * Fake partition pruner which would not prune any partitions.
+   */
+  public static class FakePartitionPruner implements PartitionPruner {
+

Review Comment:
   `Fake` -> `Dummy`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danny0405 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "danny0405 (via GitHub)" <gi...@apache.org>.
danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157173346


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -234,24 +234,18 @@ public String asSummaryString() {
 
   @Override
   public Result applyFilters(List<ResolvedExpression> filters) {
-    List<ResolvedExpression> callExpressionFilters = filters.stream()
-        .filter(ExpressionUtils::isSimpleCallExpression)
-        .collect(Collectors.toList());
-    this.fileIndex.setFilters(callExpressionFilters);
+    List<ResolvedExpression> simpleFilters = filterSimpleCallExpression(filters);
+    Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> splitFilters =
+        extractPartitionPredicateList(
+            simpleFilters,

Review Comment:
   extractPartitionPredicateList -> splitExpressionsByPartitonPredicate



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -234,24 +234,18 @@ public String asSummaryString() {
 
   @Override
   public Result applyFilters(List<ResolvedExpression> filters) {
-    List<ResolvedExpression> callExpressionFilters = filters.stream()
-        .filter(ExpressionUtils::isSimpleCallExpression)
-        .collect(Collectors.toList());
-    this.fileIndex.setFilters(callExpressionFilters);
+    List<ResolvedExpression> simpleFilters = filterSimpleCallExpression(filters);
+    Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> splitFilters =
+        extractPartitionPredicateList(
+            simpleFilters,

Review Comment:
   extractPartitionPredicateList -> splitExpressionsByPartitonFilters



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8102:
URL: https://github.com/apache/hudi/pull/8102#issuecomment-1496697343

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15593",
       "triggerID" : "8958c445f7354ed113e8ff0a1aa9f56261bede34",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15674",
       "triggerID" : "9fd31f69d5ae5a349a553cb207337833958c7099",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15675",
       "triggerID" : "f500063efc2baefad6c6c53e77f25b730b0c0346",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15677",
       "triggerID" : "5b46e06d81e5680024cca39a94f00b2f18c179e0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15678",
       "triggerID" : "e2ceb307219b4e27b276ab986c47bf77a1ec2d25",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15840",
       "triggerID" : "3ad4f77d51bc9a280161865028f89f362fcdb754",
       "triggerType" : "PUSH"
     }, {
       "hash" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15916",
       "triggerID" : "07e8d65356ba30c13740c585d737104f30d674a5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16076",
       "triggerID" : "d2d2f59eb8ed24b0835099deb0c61d8ca993073f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16080",
       "triggerID" : "f763056c7fe6bdfeb44e93bf4df137da2e4afd21",
       "triggerType" : "PUSH"
     }, {
       "hash" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16097",
       "triggerID" : "be88d99070504f75c88bfcf48b3c078ca93a35df",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16106",
       "triggerID" : "a66c8ec83a1a8e75d1e28c3e7444b7c3306049a6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16126",
       "triggerID" : "daabef5271b668f376ff2f7e82071d56c717cbbe",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * daabef5271b668f376ff2f7e82071d56c717cbbe Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16126) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org