You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "danny0405 (via GitHub)" <gi...@apache.org> on 2023/04/03 07:50:35 UTC

[GitHub] [hudi] danny0405 commented on a diff in pull request #8102: [HUDI-5880] Support partition pruning for flink streaming source in runtime

danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1155536594


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruner.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.prune;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.util.DataTypeUtils;
+
+import org.apache.flink.table.types.DataType;
+import org.apache.hadoop.fs.Path;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Dynamic partition pruner for hoodie table source which partitions list is available in runtime phase. Note: the data of new partitions created after the job starts could be read if they match the
+ * filter conditions.
+ */
+public class PartitionPruner implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private final Option<List<ExpressionEvaluators.Evaluator>> partitionEvaluatorOpt;
+
+  private final List<String> partitionKeys;
+
+  private final List<DataType> partitionTypes;
+
+  private final String defaultParName;
+
+  private final boolean hivePartition;
+
+  private final Option<Set<String>> candidatePartitionOpt;
+
+  public PartitionPruner(
+      List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+      List<String> partitionKeys,
+      List<DataType> partitionTypes,
+      String defaultParName,
+      boolean hivePartition) {
+    this(null, partitionEvaluators, partitionKeys, partitionTypes, defaultParName, hivePartition);
+  }
+
+  public PartitionPruner(Set<String> candidatePartitions) {
+    this(candidatePartitions, null, null, null, null, false);
+  }
+
+  private PartitionPruner(
+      Set<String> candidatePartitions,
+      List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+      List<String> partitionKeys,
+      List<DataType> partitionTypes,
+      String defaultParName,
+      boolean hivePartition) {
+    this.candidatePartitionOpt = Option.ofNullable(candidatePartitions);
+    this.partitionEvaluatorOpt = Option.ofNullable(partitionEvaluators);
+    this.partitionKeys = partitionKeys;
+    this.partitionTypes = partitionTypes;
+    this.defaultParName = defaultParName;
+    this.hivePartition = hivePartition;
+  }
+
+  public Set<String> filter(Collection<String> partitions) {
+    return partitions.stream().filter(this::evaluate).collect(Collectors.toSet());
+  }
+
+  private boolean evaluate(String partition) {
+    if (candidatePartitionOpt.isPresent() && !candidatePartitionOpt.get().contains(partition)) {
+      return false;
+    }
+    if (!partitionEvaluatorOpt.isPresent() || partitionEvaluatorOpt.get().isEmpty()) {
+      return true;
+    }
+    String[] partStrArray = extractPartitionValues(partition, partitionKeys, hivePartition);
+    Map<String, ColumnStats> partStats = new LinkedHashMap<>();
+    for (int idx = 0; idx < partitionKeys.size(); idx++) {
+      String partKey = partitionKeys.get(idx);
+      Object partVal = partKey.equals(defaultParName)
+          ? null : DataTypeUtils.resolvePartition(partStrArray[idx], partitionTypes.get(idx));
+      ColumnStats columnStats = new ColumnStats(partVal, partVal, partVal == null ? 1 : 0);
+      partStats.put(partKey, columnStats);
+    }
+    return partitionEvaluatorOpt.get().stream().allMatch(evaluator -> evaluator.eval(partStats));
+  }
+
+  private static String[] extractPartitionValues(
+      String partitionPath,
+      List<String> partitionKeys,

Review Comment:
   Can reuse the method: `FilePathUtils.extractPartitionKeyValues`



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java:
##########
@@ -177,4 +183,123 @@ public static Object getValueFromLiteral(ValueLiteralExpression expr) {
         throw new UnsupportedOperationException("Unsupported type: " + logicalType);
     }
   }
+
+  public static List<ResolvedExpression> filterSimpleCallExpression(List<ResolvedExpression> exprs) {
+    return exprs.stream()
+        .filter(ExpressionUtils::isSimpleCallExpression)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Extracts partition predicate from filter condition.
+   * Returns partition predicates and non-partition predicates.
+   */
+  public static Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> extractPartitionPredicateList(
+      List<ResolvedExpression> exprs,
+      List<String> partitionKeys,
+      RowType tableRowType) {
+    if (partitionKeys.isEmpty()) {
+      return Tuple2.of(exprs, Collections.emptyList());
+    } else {
+      List<ResolvedExpression> partitionFilters = new ArrayList<>();
+      List<ResolvedExpression> nonPartitionFilters = new ArrayList<>();
+      int[] partitionIdxMapping = tableRowType.getFieldNames().stream().mapToInt(partitionKeys::indexOf).toArray();
+      for (ResolvedExpression expr : exprs) {

Review Comment:
   instead of the using int[], is it possible we instantiate it as a set of Integer, and filter the candidate expressions by that?



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -86,6 +90,17 @@
  */
 @ExtendWith(FlinkMiniCluster.class)
 public class ITTestHoodieDataSource {
+
+  private static List<RowData> DATA_SET_NEW_PARTITIONS = Arrays.asList(
+      insertRow(
+          StringData.fromString("id9"), StringData.fromString("LiLi"), 24,
+          TimestampData.fromEpochMillis(9), StringData.fromString("par5")),
+      insertRow(StringData.fromString("id11"), StringData.fromString("Baobao"), 34,
+          TimestampData.fromEpochMillis(10), StringData.fromString("par7")),
+      insertRow(StringData.fromString("id12"), StringData.fromString("LinLin"), 34,
+          TimestampData.fromEpochMillis(10), StringData.fromString("part8"))

Review Comment:
   Can we move all the test data into clazz `TestData`.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java:
##########
@@ -60,23 +61,36 @@ public class FileIndex {
 
   private final Path path;
   private final RowType rowType;
+  private final boolean tableExists;
   private final HoodieMetadataConfig metadataConfig;
+  private final PartitionPruner partitionPruner;
   private final boolean dataSkippingEnabled;
+  private final DataPruner dataPruner;
   private List<String> partitionPaths;      // cache of partition paths
-  private List<ResolvedExpression> filters; // push down filters
-  private final boolean tableExists;
-  private DataPruner dataPruner;
 
-  private FileIndex(Path path, Configuration conf, RowType rowType) {
+  private FileIndex(Path path, Configuration conf, RowType rowType, DataPruner dataPruner, PartitionPruner partitionPruner) {
     this.path = path;
     this.rowType = rowType;
+    this.tableExists = StreamerUtil.tableExists(path.toString(), HadoopConfigurations.getHadoopConf(conf));
     this.metadataConfig = metadataConfig(conf);
     this.dataSkippingEnabled = conf.getBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED);
-    this.tableExists = StreamerUtil.tableExists(path.toString(), HadoopConfigurations.getHadoopConf(conf));
+    // NOTE: Data Skipping is only effective when it references columns that are indexed w/in
+    //       the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping:
+    //          - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since
+    //          CSI only contains stats for top-level columns, in this case for "struct")
+    //          - Any expression not directly referencing top-level column (for ex, sub-queries, since there's
+    //          nothing CSI in particular could be applied for)
+    if (!metadataConfig.enabled() || !dataSkippingEnabled) {
+      validateConfig();
+      this.dataPruner = null;
+    } else {
+      this.dataPruner = dataPruner;
+    }
+    this.partitionPruner = partitionPruner;

Review Comment:
   can we move the instantiation of `dataPruner` into a separate method named like `initDataPruner` ?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruner.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.prune;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.util.DataTypeUtils;
+
+import org.apache.flink.table.types.DataType;
+import org.apache.hadoop.fs.Path;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Dynamic partition pruner for hoodie table source which partitions list is available in runtime phase. Note: the data of new partitions created after the job starts could be read if they match the
+ * filter conditions.
+ */
+public class PartitionPruner implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private final Option<List<ExpressionEvaluators.Evaluator>> partitionEvaluatorOpt;
+
+  private final List<String> partitionKeys;
+
+  private final List<DataType> partitionTypes;
+
+  private final String defaultParName;
+
+  private final boolean hivePartition;
+
+  private final Option<Set<String>> candidatePartitionOpt;
+
+  public PartitionPruner(
+      List<ExpressionEvaluators.Evaluator> partitionEvaluators,
+      List<String> partitionKeys,

Review Comment:
   Can we have some subclass for the `PartitionPruner` in 3 cases:
   
   1. constant canddate filters to filter by
   2. Explicit partition evaludators
   3. No explicit canddate filters or partition evaludators and should always return true directly.
   
   Define the 3 sub-classes as static inner class and add add a factory method `PartitionPruner #getInstance` to fetch the right pruner instant with given configurations.



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -1892,4 +1992,66 @@ private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sin
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
+
+  private void testContinuousPartitionPrune(
+      HoodieTableType tableType,
+      boolean hiveStylePartitioning,
+      String filterCondition,
+      List<RowData> results
+  ) throws Exception {
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.TABLE_NAME, "t1");
+    conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
+    conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
+
+    // write one commit
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.READ_AS_STREAMING, true)
+        .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
+        .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+
+    String sinkDDL = "create table sink(\n"
+        + "  uuid varchar(20),\n"
+        + "  name varchar(20),\n"
+        + "  age int,\n"
+        + "  ts timestamp,\n"
+        + "  part varchar(20)"
+        + ") with (\n"
+        + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+        + ")";
+    TableResult tableResult = submitSelectSql(
+        streamTableEnv,
+        "select uuid, name, age, ts, `partition` as part from t1 where " + filterCondition,
+        sinkDDL);
+
+    // write second commit
+    TestData.writeData(DATA_SET_NEW_PARTITIONS, conf);
+    // stop the streaming query and fetch the result
+    List<Row> result = stopAndFetchData(streamTableEnv, tableResult, 10);
+    assertRowsEquals(result, results);
+  }
+
+  private TableResult submitSelectSql(TableEnvironment tEnv, String select, String sinkDDL) {
+    tEnv.executeSql("DROP TABLE IF EXISTS sink");
+    tEnv.executeSql(sinkDDL);
+    TableResult tableResult = tEnv.executeSql("insert into sink " + select);
+    return tableResult;
+  }
+
+  private List<Row> stopAndFetchData(TableEnvironment tEnv, TableResult tableResult, long timeout)
+      throws InterruptedException {
+    // wait for the timeout then cancels the job

Review Comment:
   The existing `#execSelectSql` does not work here?



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java:
##########
@@ -98,7 +98,7 @@ void testFileListingUsingMetadataNonPartitionedTable() throws Exception {
   void testFileListingEmptyTable(boolean enableMetadata) {
     Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setBoolean(METADATA_ENABLED, enableMetadata);
-    FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf, TestConfigurations.ROW_TYPE);
+    FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf, TestConfigurations.ROW_TYPE, null, null);
     List<String> partitionKeys = Collections.singletonList("partition");

Review Comment:
   Can add another factory method with no pruners.



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -1892,4 +1992,66 @@ private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sin
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
+
+  private void testContinuousPartitionPrune(
+      HoodieTableType tableType,
+      boolean hiveStylePartitioning,
+      String filterCondition,
+      List<RowData> results
+  ) throws Exception {
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());

Review Comment:
   Can we simplify the test case to reuse one input data set, and test all kinds of filter oconditions.
   
   We can parameterize the param: tableType, hiveStylePartitioning and filterConditions then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org